PHOENIX-538 Support UDFs(Rajeshbabu Chintaguntla)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/66bd3e35 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/66bd3e35 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/66bd3e35 Branch: refs/heads/master Commit: 66bd3e35c0d2105dcc393116f8bb5851ce1f5ec4 Parents: cd29be2 Author: Rajeshbabu Chintaguntla <rajeshb...@apache.org> Authored: Mon Apr 27 14:03:44 2015 +0530 Committer: Rajeshbabu Chintaguntla <rajeshb...@apache.org> Committed: Mon Apr 27 14:03:44 2015 +0530 ---------------------------------------------------------------------- bin/phoenix_utils.py | 9 + bin/sqlline.py | 2 +- .../end2end/QueryDatabaseMetaDataIT.java | 5 + .../end2end/TenantSpecificTablesDDLIT.java | 5 + .../phoenix/end2end/UserDefinedFunctionsIT.java | 605 +++ phoenix-core/src/main/antlr3/PhoenixSQL.g | 77 +- .../org/apache/phoenix/cache/GlobalCache.java | 30 +- .../apache/phoenix/compile/ColumnResolver.java | 17 + .../phoenix/compile/CreateFunctionCompiler.java | 80 + .../phoenix/compile/CreateIndexCompiler.java | 2 +- .../apache/phoenix/compile/DeleteCompiler.java | 2 +- .../phoenix/compile/ExpressionCompiler.java | 17 +- .../apache/phoenix/compile/FromCompiler.java | 199 +- .../apache/phoenix/compile/JoinCompiler.java | 9 +- .../apache/phoenix/compile/PostDDLCompiler.java | 14 + .../phoenix/compile/ProjectionCompiler.java | 2 +- .../apache/phoenix/compile/QueryCompiler.java | 18 +- .../apache/phoenix/compile/RowProjector.java | 32 +- .../phoenix/compile/StatementNormalizer.java | 5 +- .../phoenix/compile/SubqueryRewriter.java | 4 +- .../phoenix/compile/SubselectRewriter.java | 2 +- .../coprocessor/MetaDataEndpointImpl.java | 651 ++- .../phoenix/coprocessor/MetaDataProtocol.java | 30 +- .../coprocessor/generated/MetaDataProtos.java | 4274 +++++++++++++++--- .../coprocessor/generated/PFunctionProtos.java | 2942 ++++++++++++ .../phoenix/exception/SQLExceptionCode.java | 20 +- .../phoenix/exception/SQLExceptionInfo.java | 16 + .../phoenix/expression/ExpressionType.java | 4 +- .../expression/function/ScalarFunction.java | 2 +- .../expression/function/UDFExpression.java | 220 + .../visitor/CloneExpressionVisitor.java | 6 + .../apache/phoenix/index/IndexMaintainer.java | 50 +- .../apache/phoenix/jdbc/PhoenixConnection.java | 35 +- .../phoenix/jdbc/PhoenixDatabaseMetaData.java | 30 + .../apache/phoenix/jdbc/PhoenixStatement.java | 132 +- .../apache/phoenix/optimize/QueryOptimizer.java | 4 +- .../phoenix/parse/CreateFunctionStatement.java | 42 + .../phoenix/parse/CreateIndexStatement.java | 8 +- .../org/apache/phoenix/parse/DMLStatement.java | 11 +- .../apache/phoenix/parse/DeleteStatement.java | 5 +- .../phoenix/parse/DropFunctionStatement.java | 41 + .../apache/phoenix/parse/FunctionParseNode.java | 75 +- .../parse/IndexExpressionParseNodeRewriter.java | 4 +- .../org/apache/phoenix/parse/NamedNode.java | 2 +- .../org/apache/phoenix/parse/PFunction.java | 255 ++ .../apache/phoenix/parse/ParseNodeFactory.java | 70 +- .../apache/phoenix/parse/ParseNodeRewriter.java | 2 +- .../apache/phoenix/parse/SelectStatement.java | 22 +- .../org/apache/phoenix/parse/UDFParseNode.java | 27 + .../apache/phoenix/parse/UpsertStatement.java | 9 +- .../apache/phoenix/protobuf/ProtobufUtil.java | 10 + .../phoenix/query/ConnectionQueryServices.java | 4 + .../query/ConnectionQueryServicesImpl.java | 161 +- .../query/ConnectionlessQueryServicesImpl.java | 52 +- .../query/DelegateConnectionQueryServices.java | 31 + .../apache/phoenix/query/MetaDataMutated.java | 3 + .../apache/phoenix/query/QueryConstants.java | 36 + .../org/apache/phoenix/query/QueryServices.java | 2 + .../phoenix/query/QueryServicesOptions.java | 4 +- .../schema/FunctionAlreadyExistsException.java | 58 + .../schema/FunctionNotFoundException.java | 52 + .../apache/phoenix/schema/MetaDataClient.java | 256 +- .../NewerFunctionAlreadyExistsException.java | 39 + .../org/apache/phoenix/schema/PMetaData.java | 6 +- .../apache/phoenix/schema/PMetaDataEntity.java | 22 + .../apache/phoenix/schema/PMetaDataImpl.java | 118 +- .../java/org/apache/phoenix/schema/PTable.java | 3 +- .../org/apache/phoenix/util/MetaDataUtil.java | 7 +- .../org/apache/phoenix/util/SchemaUtil.java | 17 +- .../apache/phoenix/parse/QueryParserTest.java | 18 - .../query/ParallelIteratorsSplitTest.java | 15 + phoenix-protocol/src/main/MetaDataService.proto | 37 +- phoenix-protocol/src/main/PFunction.proto | 45 + 73 files changed, 10222 insertions(+), 899 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/bin/phoenix_utils.py ---------------------------------------------------------------------- diff --git a/bin/phoenix_utils.py b/bin/phoenix_utils.py index 36c7b82..2cf7db7 100755 --- a/bin/phoenix_utils.py +++ b/bin/phoenix_utils.py @@ -64,6 +64,15 @@ def setPath(): phoenix_client_jar = find("phoenix-*-client.jar", phoenix_jar_path) global phoenix_test_jar_path phoenix_test_jar_path = os.path.join(current_dir, "..", "phoenix-core", "target","*") + global hadoop_common_jar_path + hadoop_common_jar_path = os.path.join(current_dir, "..", "phoenix-assembly", "target","*") + global hadoop_common_jar + hadoop_common_jar = find("hadoop-common*.jar", hadoop_common_jar_path) + global hadoop_hdfs_jar_path + hadoop_hdfs_jar_path = os.path.join(current_dir, "..", "phoenix-assembly", "target","*") + global hadoop_hdfs_jar + hadoop_hdfs_jar = find("hadoop-hdfs*.jar", hadoop_hdfs_jar_path) + global hbase_conf_dir hbase_conf_dir = os.getenv('HBASE_CONF_DIR', os.getenv('HBASE_CONF_PATH', '.')) global hbase_conf_path # keep conf_path around for backward compatibility http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/bin/sqlline.py ---------------------------------------------------------------------- diff --git a/bin/sqlline.py b/bin/sqlline.py index 6e5b5fa..80b5ff7 100755 --- a/bin/sqlline.py +++ b/bin/sqlline.py @@ -53,7 +53,7 @@ colorSetting = "true" if os.name == 'nt': colorSetting = "false" -java_cmd = 'java -cp "' + phoenix_utils.hbase_conf_dir + os.pathsep + phoenix_utils.phoenix_client_jar + \ +java_cmd = 'java -cp "' + phoenix_utils.hbase_conf_dir + os.pathsep + phoenix_utils.phoenix_client_jar + os.pathsep + phoenix_utils.hadoop_common_jar + os.pathsep + phoenix_utils.hadoop_hdfs_jar + \ '" -Dlog4j.configuration=file:' + \ os.path.join(phoenix_utils.current_dir, "log4j.properties") + \ " sqlline.SqlLine -d org.apache.phoenix.jdbc.PhoenixDriver \ http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java index c9ec0ce..61459a5 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java @@ -19,6 +19,7 @@ package org.apache.phoenix.end2end; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_TABLE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_SEQUENCE; import static org.apache.phoenix.util.TestUtil.ATABLE_NAME; import static org.apache.phoenix.util.TestUtil.ATABLE_SCHEMA_NAME; @@ -125,6 +126,10 @@ public class QueryDatabaseMetaDataIT extends BaseClientManagedTimeIT { assertEquals(PTableType.SYSTEM.toString(), rs.getString("TABLE_TYPE")); assertTrue(rs.next()); assertEquals(rs.getString("TABLE_SCHEM"),SYSTEM_CATALOG_SCHEMA); + assertEquals(rs.getString("TABLE_NAME"),SYSTEM_FUNCTION_TABLE); + assertEquals(PTableType.SYSTEM.toString(), rs.getString("TABLE_TYPE")); + assertTrue(rs.next()); + assertEquals(rs.getString("TABLE_SCHEM"),SYSTEM_CATALOG_SCHEMA); assertEquals(rs.getString("TABLE_NAME"),TYPE_SEQUENCE); assertEquals(PTableType.SYSTEM.toString(), rs.getString("TABLE_TYPE")); assertTrue(rs.next()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java index 4d0b45d..a7c7291 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java @@ -26,6 +26,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.KEY_SEQ; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_TABLE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_SEQUENCE; import static org.apache.phoenix.schema.PTableType.SYSTEM; import static org.apache.phoenix.schema.PTableType.TABLE; @@ -473,6 +474,8 @@ public class TenantSpecificTablesDDLIT extends BaseTenantSpecificTablesIT { assertTrue(rs.next()); assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA, SYSTEM_CATALOG_TABLE, SYSTEM); assertTrue(rs.next()); + assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA, SYSTEM_FUNCTION_TABLE, SYSTEM); + assertTrue(rs.next()); assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA, TYPE_SEQUENCE, SYSTEM); assertTrue(rs.next()); assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA, PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE, SYSTEM); @@ -539,6 +542,8 @@ public class TenantSpecificTablesDDLIT extends BaseTenantSpecificTablesIT { assertTrue(rs.next()); assertTableMetaData(rs, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE, PTableType.SYSTEM); assertTrue(rs.next()); + assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA, SYSTEM_FUNCTION_TABLE, SYSTEM); + assertTrue(rs.next()); assertTableMetaData(rs, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, PhoenixDatabaseMetaData.TYPE_SEQUENCE, PTableType.SYSTEM); assertTrue(rs.next()); assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA, PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE, PTableType.SYSTEM); http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/it/java/org/apache/phoenix/end2end/UserDefinedFunctionsIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UserDefinedFunctionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UserDefinedFunctionsIT.java new file mode 100644 index 0000000..d56004b --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UserDefinedFunctionsIT.java @@ -0,0 +1,605 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL; +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR; +import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_TABLE; +import static org.apache.phoenix.query.QueryServices.DYNAMIC_JARS_DIR_KEY; +import static org.apache.phoenix.util.TestUtil.LOCALHOST; +import static org.junit.Assert.*; + +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Map; +import java.util.Properties; +import java.util.jar.Attributes; +import java.util.jar.JarEntry; +import java.util.jar.JarOutputStream; +import java.util.jar.Manifest; + +import javax.tools.JavaCompiler; +import javax.tools.ToolProvider; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.io.IOUtils; +import org.apache.phoenix.expression.function.UDFExpression; +import org.apache.phoenix.jdbc.PhoenixTestDriver; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.FunctionAlreadyExistsException; +import org.apache.phoenix.schema.FunctionNotFoundException; +import org.apache.phoenix.schema.ValueRangeExcpetion; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Maps; + +public class UserDefinedFunctionsIT extends BaseTest{ + + protected static final String TENANT_ID = "ZZTop"; + private static String url; + private static PhoenixTestDriver driver; + private static HBaseTestingUtility util; + + private static String STRING_REVERSE_EVALUATE_METHOD = + new StringBuffer() + .append(" public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {\n") + .append(" Expression arg = getChildren().get(0);\n") + .append(" if (!arg.evaluate(tuple, ptr)) {\n") + .append(" return false;\n") + .append(" }\n") + .append(" int targetOffset = ptr.getLength();\n") + .append(" if (targetOffset == 0) {\n") + .append(" return true;\n") + .append(" }\n") + .append(" byte[] source = ptr.get();\n") + .append(" byte[] target = new byte[targetOffset];\n") + .append(" int sourceOffset = ptr.getOffset(); \n") + .append(" int endOffset = sourceOffset + ptr.getLength();\n") + .append(" SortOrder sortOrder = arg.getSortOrder();\n") + .append(" while (sourceOffset < endOffset) {\n") + .append(" int nBytes = StringUtil.getBytesInChar(source[sourceOffset], sortOrder);\n") + .append(" targetOffset -= nBytes;\n") + .append(" System.arraycopy(source, sourceOffset, target, targetOffset, nBytes);\n") + .append(" sourceOffset += nBytes;\n") + .append(" }\n") + .append(" ptr.set(target);\n") + .append(" return true;\n") + .append(" }\n").toString(); + + private static String SUM_COLUMN_VALUES_EVALUATE_METHOD = + new StringBuffer() + .append(" public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {\n") + .append(" int[] array = new int[getChildren().size()];\n") + .append(" int i = 0;\n") + .append(" for(Expression child:getChildren()) {\n") + .append(" if (!child.evaluate(tuple, ptr)) {\n") + .append(" return false;\n") + .append(" }\n") + .append(" int targetOffset = ptr.getLength();\n") + .append(" if (targetOffset == 0) {\n") + .append(" return true;\n") + .append(" }\n") + .append(" array[i++] = (Integer) PInteger.INSTANCE.toObject(ptr);\n") + .append(" }\n") + .append(" int sum = 0;\n") + .append(" for(i=0;i<getChildren().size();i++) {\n") + .append(" sum+=array[i];\n") + .append(" }\n") + .append(" ptr.set(PInteger.INSTANCE.toBytes((Integer)sum));\n") + .append(" return true;\n") + .append(" }\n").toString(); + + private static String MY_REVERSE_CLASS_NAME = "MyReverse"; + private static String MY_SUM_CLASS_NAME = "MySum"; + private static String MY_REVERSE_PROGRAM = getProgram(MY_REVERSE_CLASS_NAME, STRING_REVERSE_EVALUATE_METHOD, "PVarchar"); + private static String MY_SUM_PROGRAM = getProgram(MY_SUM_CLASS_NAME, SUM_COLUMN_VALUES_EVALUATE_METHOD, "PInteger"); + private static Properties EMPTY_PROPS = new Properties(); + + + private static String getProgram(String className, String evaluateMethod, String returnType) { + return new StringBuffer() + .append("package org.apache.phoenix.end2end;\n") + .append("import java.sql.SQLException;\n") + .append("import java.sql.SQLException;\n") + .append("import java.util.List;\n") + .append("import org.apache.hadoop.hbase.io.ImmutableBytesWritable;\n") + .append("import org.apache.phoenix.expression.Expression;\n") + .append("import org.apache.phoenix.expression.function.ScalarFunction;\n") + .append("import org.apache.phoenix.schema.SortOrder;\n") + .append("import org.apache.phoenix.schema.tuple.Tuple;\n") + .append("import org.apache.phoenix.schema.types.PDataType;\n") + .append("import org.apache.phoenix.schema.types.PInteger;\n") + .append("import org.apache.phoenix.schema.types.PVarchar;\n") + .append("import org.apache.phoenix.util.StringUtil;\n") + .append("public class "+className+" extends ScalarFunction{\n") + .append(" public static final String NAME = \"MY_REVERSE\";\n") + .append(" public "+className+"() {\n") + .append(" }\n") + .append(" public "+className+"(List<Expression> children) throws SQLException {\n") + .append(" super(children);\n") + .append(" }\n") + .append(" @Override\n") + .append(evaluateMethod) + .append(" @Override\n") + .append(" public SortOrder getSortOrder() {\n") + .append(" return getChildren().get(0).getSortOrder();\n") + .append(" }\n") + .append(" @Override\n") + .append(" public PDataType getDataType() {\n") + .append(" return "+returnType+".INSTANCE;\n") + .append(" }\n") + .append(" @Override\n") + .append(" public String getName() {\n") + .append(" return NAME;\n") + .append(" }\n") + .append("}\n").toString(); + } + + @BeforeClass + public static void doSetup() throws Exception { + Configuration conf = HBaseConfiguration.create(); + setUpConfigForMiniCluster(conf); + util = new HBaseTestingUtility(conf); + util.startMiniDFSCluster(1); + util.startMiniZKCluster(1); + String string = util.getConfiguration().get("fs.defaultFS"); + conf.set(DYNAMIC_JARS_DIR_KEY, string+"/hbase/tmpjars"); + util.startMiniHBaseCluster(1, 1); + UDFExpression.setConfig(conf); + compileTestClass(MY_REVERSE_CLASS_NAME, MY_REVERSE_PROGRAM, 1); + compileTestClass(MY_SUM_CLASS_NAME, MY_SUM_PROGRAM, 2); + String clientPort = util.getConfiguration().get(QueryServices.ZOOKEEPER_PORT_ATTRIB); + url = + JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_SEPARATOR + + clientPort + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM; + Map<String, String> props = Maps.newHashMapWithExpectedSize(1); + props.put(QueryServices.ALLOW_USER_DEFINED_FUNCTIONS_ATTRIB, "true"); + driver = initAndRegisterDriver(url, new ReadOnlyProps(props.entrySet().iterator())); + } + + @Test + public void testCreateFunction() throws Exception { + Connection conn = driver.connect(url, EMPTY_PROPS); + Statement stmt = conn.createStatement(); + conn.createStatement().execute("create table t(k integer primary key, firstname varchar, lastname varchar)"); + stmt.execute("upsert into t values(1,'foo','jock')"); + conn.commit(); + stmt.execute("create function myreverse(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar " + + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'"); + ResultSet rs = stmt.executeQuery("select myreverse(firstname) from t"); + assertTrue(rs.next()); + assertEquals("oof", rs.getString(1)); + assertFalse(rs.next()); + rs = stmt.executeQuery("select * from t where myreverse(firstname)='oof'"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("foo", rs.getString(2)); + assertEquals("jock", rs.getString(3)); + assertFalse(rs.next()); + + try { + stmt.execute("create function myreverse(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar " + + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'"); + fail("Duplicate function should not be created."); + } catch(FunctionAlreadyExistsException e) { + } + // without specifying the jar should pick the class from path of hbase.dynamic.jars.dir configuration. + stmt.execute("create function myreverse2(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"'"); + rs = stmt.executeQuery("select myreverse2(firstname) from t"); + assertTrue(rs.next()); + assertEquals("oof", rs.getString(1)); + assertFalse(rs.next()); + rs = stmt.executeQuery("select * from t where myreverse2(firstname)='oof'"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("foo", rs.getString(2)); + assertEquals("jock", rs.getString(3)); + assertFalse(rs.next()); + conn.createStatement().execute("create table t3(tenant_id varchar not null, k integer not null, firstname varchar, lastname varchar constraint pk primary key(tenant_id,k)) MULTI_TENANT=true"); + // Function created with global id should be accessible. + Connection conn2 = driver.connect(url+";"+PhoenixRuntime.TENANT_ID_ATTRIB+"="+TENANT_ID, EMPTY_PROPS); + try { + conn2.createStatement().execute("upsert into t3 values(1,'foo','jock')"); + conn2.commit(); + conn2.createStatement().execute("create function myreverse(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar " + + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'"); + rs = conn2.createStatement().executeQuery("select myreverse(firstname) from t3"); + assertTrue(rs.next()); + assertEquals("oof", rs.getString(1)); + } catch(FunctionAlreadyExistsException e) { + fail("FunctionAlreadyExistsException should not be thrown"); + } + // calling global udf on tenant specific specific connection. + rs = conn2.createStatement().executeQuery("select myreverse2(firstname) from t3"); + assertTrue(rs.next()); + assertEquals("oof", rs.getString(1)); + try { + conn2.createStatement().execute("drop function myreverse2"); + fail("FunctionNotFoundException should be thrown"); + } catch(FunctionNotFoundException e){ + + } + conn.createStatement().execute("drop function myreverse2"); + try { + rs = conn2.createStatement().executeQuery("select myreverse2(firstname) from t3"); + fail("FunctionNotFoundException should be thrown."); + } catch(FunctionNotFoundException e){ + + } + try{ + rs = conn2.createStatement().executeQuery("select unknownFunction(firstname) from t3"); + fail("FunctionNotFoundException should be thrown."); + } catch(FunctionNotFoundException e) { + + } + } + + @Test + public void testSameUDFWithDifferentImplementationsInDifferentTenantConnections() throws Exception { + Connection nonTenantConn = driver.connect(url, EMPTY_PROPS); + nonTenantConn.createStatement().execute("create function myfunction(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar " + + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'"); + try { + nonTenantConn.createStatement().execute("create function myfunction(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end.UnknownClass' using jar " + + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'"); + fail("FunctionAlreadyExistsException should be thrown."); + } catch(FunctionAlreadyExistsException e) { + + } + String tenantId1="tenId1"; + String tenantId2="tenId2"; + nonTenantConn.createStatement().execute("create table t7(tenant_id varchar not null, k integer not null, k1 integer, name varchar constraint pk primary key(tenant_id, k)) multi_tenant=true"); + Connection tenant1Conn = driver.connect(url+";"+PhoenixRuntime.TENANT_ID_ATTRIB+"="+tenantId1, EMPTY_PROPS); + Connection tenant2Conn = driver.connect(url+";"+PhoenixRuntime.TENANT_ID_ATTRIB+"="+tenantId2, EMPTY_PROPS); + tenant1Conn.createStatement().execute("upsert into t7 values(1,1,'jock')"); + tenant1Conn.commit(); + tenant2Conn.createStatement().execute("upsert into t7 values(1,2,'jock')"); + tenant2Conn.commit(); + tenant1Conn.createStatement().execute("create function myfunction(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar " + + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'"); + try { + tenant1Conn.createStatement().execute("create function myfunction(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end.UnknownClass' using jar " + + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'"); + fail("FunctionAlreadyExistsException should be thrown."); + } catch(FunctionAlreadyExistsException e) { + + } + + tenant2Conn.createStatement().execute("create function myfunction(INTEGER, INTEGER CONSTANT defaultValue='10' minvalue='1' maxvalue='15' ) returns INTEGER as 'org.apache.phoenix.end2end."+MY_SUM_CLASS_NAME+"' using jar " + + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar2.jar"+"'"); + try { + tenant2Conn.createStatement().execute("create function myfunction(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end.UnknownClass' using jar " + + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/unknown.jar"+"'"); + fail("FunctionAlreadyExistsException should be thrown."); + } catch(FunctionAlreadyExistsException e) { + + } + + ResultSet rs = tenant1Conn.createStatement().executeQuery("select MYFUNCTION(name) from t7"); + assertTrue(rs.next()); + assertEquals("kcoj", rs.getString(1)); + assertFalse(rs.next()); + rs = tenant1Conn.createStatement().executeQuery("select * from t7 where MYFUNCTION(name)='kcoj'"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals(1, rs.getInt(2)); + assertEquals("jock", rs.getString(3)); + assertFalse(rs.next()); + + rs = tenant2Conn.createStatement().executeQuery("select MYFUNCTION(k) from t7"); + assertTrue(rs.next()); + assertEquals(11, rs.getInt(1)); + assertFalse(rs.next()); + rs = tenant2Conn.createStatement().executeQuery("select * from t7 where MYFUNCTION(k1)=12"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals(2, rs.getInt(2)); + assertEquals("jock", rs.getString(3)); + assertFalse(rs.next()); + } + + @Test + public void testUDFsWithMultipleConnections() throws Exception { + Connection conn1 = driver.connect(url, EMPTY_PROPS); + conn1.createStatement().execute("create function myfunction(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar " + + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'"); + Connection conn2 = driver.connect(url, EMPTY_PROPS); + try{ + conn2.createStatement().execute("create function myfunction(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar " + + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'"); + fail("FunctionAlreadyExistsException should be thrown."); + } catch(FunctionAlreadyExistsException e) { + + } + conn2.createStatement().execute("create table t8(k integer not null primary key, k1 integer, name varchar)"); + conn2.createStatement().execute("upsert into t8 values(1,1,'jock')"); + conn2.commit(); + ResultSet rs = conn2.createStatement().executeQuery("select MYFUNCTION(name) from t8"); + assertTrue(rs.next()); + assertEquals("kcoj", rs.getString(1)); + assertFalse(rs.next()); + rs = conn2.createStatement().executeQuery("select * from t8 where MYFUNCTION(name)='kcoj'"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals(1, rs.getInt(2)); + assertEquals("jock", rs.getString(3)); + assertFalse(rs.next()); + conn2.createStatement().execute("drop function MYFUNCTION"); + try { + rs = conn1.createStatement().executeQuery("select MYFUNCTION(name) from t8"); + fail("FunctionNotFoundException should be thrown"); + } catch(FunctionNotFoundException e) { + + } + } + @Test + public void testUsingUDFFunctionInDifferentQueries() throws Exception { + Connection conn = driver.connect(url, EMPTY_PROPS); + Statement stmt = conn.createStatement(); + conn.createStatement().execute("create table t1(k integer primary key, firstname varchar, lastname varchar)"); + stmt.execute("upsert into t1 values(1,'foo','jock')"); + conn.commit(); + conn.createStatement().execute("create table t2(k integer primary key, k1 integer, lastname_reverse varchar)"); + conn.commit(); + stmt.execute("create function mysum3(INTEGER, INTEGER CONSTANT defaultValue='10' minvalue='1' maxvalue='15' ) returns INTEGER as 'org.apache.phoenix.end2end."+MY_SUM_CLASS_NAME+"' using jar " + + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar2.jar"+"'"); + stmt.execute("create function myreverse3(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"' using jar " + + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar1.jar"+"'"); + stmt.execute("upsert into t2(k,k1,lastname_reverse) select mysum3(k),mysum3(k,11),myreverse3(lastname) from t1"); + conn.commit(); + ResultSet rs = stmt.executeQuery("select * from t2"); + assertTrue(rs.next()); + assertEquals(11, rs.getInt(1)); + assertEquals(12, rs.getInt(2)); + assertEquals("kcoj", rs.getString(3)); + assertFalse(rs.next()); + stmt.execute("delete from t2 where myreverse3(lastname_reverse)='jock' and mysum3(k)=21"); + conn.commit(); + rs = stmt.executeQuery("select * from t2"); + assertFalse(rs.next()); + stmt.execute("create function myreverse4(VARCHAR CONSTANT defaultValue='null') returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"'"); + stmt.execute("upsert into t2 values(11,12,myreverse4('jock'))"); + conn.commit(); + rs = stmt.executeQuery("select * from t2"); + assertTrue(rs.next()); + assertEquals(11, rs.getInt(1)); + assertEquals(12, rs.getInt(2)); + assertEquals("kcoj", rs.getString(3)); + assertFalse(rs.next()); + } + + @Test + public void testVerifyCreateFunctionArguments() throws Exception { + Connection conn = driver.connect(url, EMPTY_PROPS); + Statement stmt = conn.createStatement(); + conn.createStatement().execute("create table t4(k integer primary key, k1 integer, lastname varchar)"); + stmt.execute("upsert into t4 values(1,1,'jock')"); + conn.commit(); + stmt.execute("create function mysum(INTEGER, INTEGER CONSTANT defaultValue='10' minvalue='1' maxvalue='15' ) returns INTEGER as 'org.apache.phoenix.end2end."+MY_SUM_CLASS_NAME+"' using jar " + + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar2.jar"+"'"); + ResultSet rs = stmt.executeQuery("select mysum(k,12) from t4"); + assertTrue(rs.next()); + assertEquals(13, rs.getInt(1)); + rs = stmt.executeQuery("select mysum(k) from t4"); + assertTrue(rs.next()); + assertEquals(11, rs.getInt(1)); + try { + stmt.executeQuery("select mysum(k,20) from t4"); + fail("Value Range Exception should be thrown."); + } catch(ValueRangeExcpetion e) { + + } + } + + @Test + public void testTemporaryFunctions() throws Exception { + Connection conn = driver.connect(url, EMPTY_PROPS); + Statement stmt = conn.createStatement(); + conn.createStatement().execute("create table t9(k integer primary key, k1 integer, lastname varchar)"); + stmt.execute("upsert into t9 values(1,1,'jock')"); + conn.commit(); + stmt.execute("create temporary function mysum9(INTEGER, INTEGER CONSTANT defaultValue='10' minvalue='1' maxvalue='15' ) returns INTEGER as 'org.apache.phoenix.end2end."+MY_SUM_CLASS_NAME+"' using jar " + + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar2.jar"+"'"); + ResultSet rs = stmt.executeQuery("select mysum9(k,12) from t9"); + assertTrue(rs.next()); + assertEquals(13, rs.getInt(1)); + rs = stmt.executeQuery("select mysum9(k) from t9"); + assertTrue(rs.next()); + assertEquals(11, rs.getInt(1)); + rs = stmt.executeQuery("select k from t9 where mysum9(k)=11"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + stmt.execute("drop function mysum9"); + try { + rs = stmt.executeQuery("select k from t9 where mysum9(k)=11"); + fail("FunctionNotFoundException should be thrown"); + } catch(FunctionNotFoundException e){ + + } + } + + @Test + public void testDropFunction() throws Exception { + Connection conn = driver.connect(url, EMPTY_PROPS); + Statement stmt = conn.createStatement(); + String query = "select count(*) from "+ SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_FUNCTION_TABLE + "\""; + ResultSet rs = stmt.executeQuery(query); + rs.next(); + int numRowsBefore = rs.getInt(1); + stmt.execute("create function mysum6(INTEGER, INTEGER CONSTANT defaultValue='10' minvalue='1' maxvalue='15' ) returns INTEGER as 'org.apache.phoenix.end2end."+MY_SUM_CLASS_NAME+"' using jar " + + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar2.jar"+"'"); + rs = stmt.executeQuery(query); + rs.next(); + int numRowsAfter= rs.getInt(1); + assertEquals(3, numRowsAfter - numRowsBefore); + stmt.execute("drop function mysum6"); + rs = stmt.executeQuery(query); + rs.next(); + assertEquals(numRowsBefore, rs.getInt(1)); + conn.createStatement().execute("create table t6(k integer primary key, k1 integer, lastname varchar)"); + try { + rs = stmt.executeQuery("select mysum6(k1) from t6"); + fail("FunctionNotFoundException should be thrown"); + } catch(FunctionNotFoundException e) { + + } + try { + stmt.execute("drop function mysum6"); + fail("FunctionNotFoundException should be thrown"); + } catch(FunctionNotFoundException e) { + + } + try { + stmt.execute("drop function if exists mysum6"); + } catch(FunctionNotFoundException e) { + fail("FunctionNotFoundException should not be thrown"); + } + stmt.execute("create function mysum6(INTEGER, INTEGER CONSTANT defaultValue='10' minvalue='1' maxvalue='15' ) returns INTEGER as 'org.apache.phoenix.end2end."+MY_SUM_CLASS_NAME+"' using jar " + + "'"+util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY) + "/myjar2.jar"+"'"); + try { + rs = stmt.executeQuery("select mysum6(k1) from t6"); + } catch(FunctionNotFoundException e) { + fail("FunctionNotFoundException should not be thrown"); + } + } + + @Test + public void testFunctionalIndexesWithUDFFunction() throws Exception { + Connection conn = driver.connect(url, EMPTY_PROPS); + Statement stmt = conn.createStatement(); + stmt.execute("create table t5(k integer primary key, k1 integer, lastname_reverse varchar)"); + stmt.execute("create function myreverse5(VARCHAR) returns VARCHAR as 'org.apache.phoenix.end2end."+MY_REVERSE_CLASS_NAME+"'"); + stmt.execute("upsert into t5 values(1,1,'jock')"); + conn.commit(); + stmt.execute("create index idx on t5(myreverse5(lastname_reverse))"); + String query = "select myreverse5(lastname_reverse) from t5"; + ResultSet rs = stmt.executeQuery("explain " + query); + assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER IDX\n" + + " SERVER FILTER BY FIRST KEY ONLY", QueryUtil.getExplainPlan(rs)); + rs = stmt.executeQuery(query); + assertTrue(rs.next()); + assertEquals("kcoj", rs.getString(1)); + assertFalse(rs.next()); + stmt.execute("create local index idx2 on t5(myreverse5(lastname_reverse))"); + query = "select k,k1,myreverse5(lastname_reverse) from t5 where myreverse5(lastname_reverse)='kcoj'"; + rs = stmt.executeQuery("explain " + query); + assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_T5 [-32768,'kcoj']\n" + + " SERVER FILTER BY FIRST KEY ONLY\n" + +"CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs)); + rs = stmt.executeQuery(query); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals(1, rs.getInt(2)); + assertEquals("kcoj", rs.getString(3)); + assertFalse(rs.next()); + } + + @AfterClass + public static void tearDown() throws Exception { + try { + destroyDriver(driver); + } finally { + util.shutdownMiniCluster(); + } + } + + /** + * Compiles the test class with bogus code into a .class file. + */ + private static void compileTestClass(String className, String program, int counter) throws Exception { + String javaFileName = className+".java"; + File javaFile = new File(javaFileName); + String classFileName = className+".class"; + File classFile = new File(classFileName); + String jarName = "myjar"+counter+".jar"; + String jarPath = "." + File.separator + jarName; + File jarFile = new File(jarPath); + try { + String packageName = "org.apache.phoenix.end2end"; + FileOutputStream fos = new FileOutputStream(javaFileName); + fos.write(program.getBytes()); + fos.close(); + + JavaCompiler jc = ToolProvider.getSystemJavaCompiler(); + int result = jc.run(null, null, null, javaFileName); + assertEquals(0, result); + + Manifest manifest = new Manifest(); + manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, "1.0"); + FileOutputStream jarFos = new FileOutputStream(jarPath); + JarOutputStream jarOutputStream = new JarOutputStream(jarFos, manifest); + String pathToAdd =packageName.replace('.', File.separatorChar) + + File.separator; + jarOutputStream.putNextEntry(new JarEntry(pathToAdd)); + jarOutputStream.closeEntry(); + jarOutputStream.putNextEntry(new JarEntry(pathToAdd + classFile.getName())); + byte[] allBytes = new byte[(int) classFile.length()]; + FileInputStream fis = new FileInputStream(classFile); + fis.read(allBytes); + fis.close(); + jarOutputStream.write(allBytes); + jarOutputStream.closeEntry(); + jarOutputStream.close(); + jarFos.close(); + + assertTrue(jarFile.exists()); + + InputStream inputStream = new BufferedInputStream(new FileInputStream(jarPath)); + FileSystem fs = util.getDefaultRootDirPath().getFileSystem(util.getConfiguration()); + Path jarsLocation = new Path(util.getConfiguration().get(DYNAMIC_JARS_DIR_KEY)); + Path myJarPath; + if (jarsLocation.toString().endsWith("/")) { + myJarPath = new Path(jarsLocation.toString() + jarName); + } else { + myJarPath = new Path(jarsLocation.toString() + "/" + jarName); + } + OutputStream outputStream = fs.create(myJarPath); + try { + IOUtils.copyBytes(inputStream, outputStream, 4096, false); + } finally { + IOUtils.closeStream(inputStream); + IOUtils.closeStream(outputStream); + } + } finally { + if (javaFile != null) javaFile.delete(); + if (classFile != null) classFile.delete(); + if (jarFile != null) jarFile.delete(); + } + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/antlr3/PhoenixSQL.g ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g index f57c5cc..d2bb241 100644 --- a/phoenix-core/src/main/antlr3/PhoenixSQL.g +++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g @@ -114,6 +114,14 @@ tokens ASYNC='async'; SAMPLING='sampling'; UNION='union'; + FUNCTION='function'; + AS='as'; + TEMPORARY='temporary'; + RETURNS='returns'; + USING='using'; + JAR='jar'; + DEFAULTVALUE='defaultvalue'; + CONSTANT = 'constant'; } @@ -144,13 +152,18 @@ import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ListMultimap; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import java.lang.Boolean; import java.math.BigDecimal; import java.util.Arrays; import java.util.Collections; import java.util.Stack; import java.sql.SQLException; import org.apache.phoenix.expression.function.CountAggregateFunction; +import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.parse.PFunction; +import org.apache.phoenix.parse.PFunction.FunctionArgument; +import org.apache.phoenix.parse.UDFParseNode; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.IllegalDataException; @@ -206,6 +219,7 @@ package org.apache.phoenix.parse; private int anonBindNum; private ParseNodeFactory factory; private ParseContext.Stack contextStack = new ParseContext.Stack(); + private Map<String, UDFParseNode> udfParseNodes = new HashMap<String, UDFParseNode>(1); public void setParseNodeFactory(ParseNodeFactory factory) { this.factory = factory; @@ -341,13 +355,25 @@ package org.apache.phoenix.parse; // Used to incrementally parse a series of semicolon-terminated SQL statement // Note than unlike the rule below an EOF is not expected at the end. nextStatement returns [BindableStatement ret] - : s=oneStatement {$ret = s;} SEMICOLON + : s=oneStatement { + try { + $ret = s; + } finally { + udfParseNodes.clear(); + } + } SEMICOLON | EOF ; // Parses a single SQL statement (expects an EOF after the select statement). statement returns [BindableStatement ret] - : s=oneStatement {$ret = s;} EOF + : s=oneStatement { + try { + $ret = s; + } finally { + udfParseNodes.clear(); + } + } EOF ; // Parses a select statement which must be the only statement (expects an EOF after the statement). @@ -369,6 +395,8 @@ oneStatement returns [BindableStatement ret] | s=alter_index_node | s=alter_table_node | s=trace_node + | s=create_function_node + | s=drop_function_node | s=alter_session_node | s=create_sequence_node | s=drop_sequence_node @@ -409,7 +437,7 @@ create_index_node returns [CreateIndexStatement ret] (async=ASYNC)? (p=fam_properties)? (SPLIT ON v=value_expression_list)? - {ret = factory.createIndex(i, factory.namedTable(null,t), ik, icrefs, v, p, ex!=null, l==null ? IndexType.getDefault() : IndexType.LOCAL, async != null, getBindCount()); } + {ret = factory.createIndex(i, factory.namedTable(null,t), ik, icrefs, v, p, ex!=null, l==null ? IndexType.getDefault() : IndexType.LOCAL, async != null, getBindCount(), new HashMap<String, UDFParseNode>(udfParseNodes)); } ; // Parse a create sequence statement. @@ -510,6 +538,25 @@ trace_node returns [TraceStatement ret] {ret = factory.trace(Tracing.isTraceOn(flag.getText()), s == null ? Tracing.isTraceOn(flag.getText()) ? 1.0 : 0.0 : (((BigDecimal)s.getValue())).doubleValue());} ; +// Parse a trace statement. +create_function_node returns [CreateFunctionStatement ret] + : CREATE (temp=TEMPORARY)? FUNCTION function=identifier + (LPAREN args=zero_or_more_data_types RPAREN) + RETURNS r=identifier AS (className= jar_path) + (USING JAR (jarPath = jar_path))? + { + $ret = factory.createFunction(new PFunction(SchemaUtil.normalizeIdentifier(function), args,r,(String)className.getValue(), jarPath == null ? null : (String)jarPath.getValue()), temp!=null);; + } + ; + +jar_path returns [LiteralParseNode ret] + : l=literal { $ret = l; } + ; + +drop_function_node returns [DropFunctionStatement ret] + : DROP FUNCTION (IF ex=EXISTS)? function=identifier {$ret = factory.dropFunction(SchemaUtil.normalizeIdentifier(function), ex!=null);} + ; + // Parse an alter session statement. alter_session_node returns [AlterSessionStatement ret] : ALTER SESSION (SET p=properties) @@ -586,7 +633,7 @@ single_select returns [SelectStatement ret] (WHERE where=expression)? (GROUP BY group=group_by)? (HAVING having=expression)? - { ParseContext context = contextStack.peek(); $ret = factory.select(from, h, d!=null, sel, where, group, having, null, null, getBindCount(), context.isAggregate(), context.hasSequences(), null); } + { ParseContext context = contextStack.peek(); $ret = factory.select(from, h, d!=null, sel, where, group, having, null, null, getBindCount(), context.isAggregate(), context.hasSequences(), null, new HashMap<String,UDFParseNode>(udfParseNodes)); } ; finally{ contextStack.pop(); } @@ -610,7 +657,7 @@ upsert_node returns [UpsertStatement ret] : UPSERT (hint=hintClause)? INTO t=from_table_name (LPAREN p=upsert_column_refs RPAREN)? ((VALUES LPAREN v=one_or_more_expressions RPAREN) | s=select_node) - {ret = factory.upsert(factory.namedTable(null,t,p == null ? null : p.getFirst()), hint, p == null ? null : p.getSecond(), v, s, getBindCount()); } + {ret = factory.upsert(factory.namedTable(null,t,p == null ? null : p.getFirst()), hint, p == null ? null : p.getSecond(), v, s, getBindCount(), new HashMap<String, UDFParseNode>(udfParseNodes)); } ; upsert_column_refs returns [Pair<List<ColumnDef>,List<ColumnName>> ret] @@ -625,7 +672,7 @@ delete_node returns [DeleteStatement ret] (WHERE v=expression)? (ORDER BY order=order_by)? (LIMIT l=limit)? - {ret = factory.delete(factory.namedTable(null,t), hint, v, order, l, getBindCount()); } + {ret = factory.delete(factory.namedTable(null,t), hint, v, order, l, getBindCount(), new HashMap<String, UDFParseNode>(udfParseNodes)); } ; limit returns [LimitNode ret] @@ -813,17 +860,19 @@ term returns [ParseNode ret] if (!contextStack.isEmpty()) { contextStack.peek().setAggregate(f.isAggregate()); } + if(f instanceof UDFParseNode) udfParseNodes.put(f.getName(),(UDFParseNode)f); $ret = f; } | field=identifier LPAREN t=ASTERISK RPAREN { if (!isCountFunction(field)) { - throwRecognitionException(t); + throwRecognitionException(t); } FunctionParseNode f = factory.function(field, LiteralParseNode.STAR); if (!contextStack.isEmpty()) { contextStack.peek().setAggregate(f.isAggregate()); } + if(f instanceof UDFParseNode) udfParseNodes.put(f.getName(),(UDFParseNode)f); $ret = f; } | field=identifier LPAREN t=DISTINCT l=zero_or_more_expressions RPAREN @@ -832,6 +881,7 @@ term returns [ParseNode ret] if (!contextStack.isEmpty()) { contextStack.peek().setAggregate(f.isAggregate()); } + if(f instanceof UDFParseNode) udfParseNodes.put(f.getName(),(UDFParseNode)f); $ret = f; } | e=case_statement { $ret = e; } @@ -865,6 +915,19 @@ zero_or_more_expressions returns [List<ParseNode> ret] : (v = expression {$ret.add(v);})? (COMMA v = expression {$ret.add(v);} )* ; +zero_or_more_data_types returns [List<FunctionArgument> ret] +@init{ret = new ArrayList<FunctionArgument>(); } + : (dt = identifier (LPAREN l=NUMBER (COMMA s=NUMBER)? RPAREN)? ar=ARRAY? (lsq=LSQUARE (a=NUMBER)? RSQUARE)? (c = CONSTANT)? (DEFAULTVALUE EQ dv = value_expression)? (MINVALUE EQ minv = value_expression)? (MAXVALUE EQ maxv = value_expression)? + {$ret.add(new FunctionArgument(dt, ar != null || lsq != null, c!=null, + dv == null ? null : LiteralExpression.newConstant(((LiteralParseNode)dv).getValue()), + minv == null ? null : LiteralExpression.newConstant(((LiteralParseNode)minv).getValue()), + maxv == null ? null : LiteralExpression.newConstant(((LiteralParseNode)maxv).getValue())));})? (COMMA (dt = identifier (LPAREN l=NUMBER (COMMA s=NUMBER)? RPAREN)? ar=ARRAY? (lsq=LSQUARE (a=NUMBER)? RSQUARE)? (c = CONSTANT)? (DEFAULTVALUE EQ dv = value_expression)? (MINVALUE EQ minv = value_expression)? (MAXVALUE EQ maxv = value_expression)? + {$ret.add(new FunctionArgument(dt, ar != null || lsq != null, c!=null, + dv == null ? null : LiteralExpression.newConstant(((LiteralParseNode)dv).getValue()), + minv == null ? null : LiteralExpression.newConstant(((LiteralParseNode)minv).getValue()), + maxv == null ? null : LiteralExpression.newConstant(((LiteralParseNode)maxv).getValue())));} ))* +; + value_expression_list returns [List<ParseNode> ret] @init{ret = new ArrayList<ParseNode>(); } : LPAREN e = value_expression {$ret.add(e);} (COMMA e = value_expression {$ret.add(e);} )* RPAREN http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java index fcef0ec..643112d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java @@ -32,8 +32,10 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.memory.ChildMemoryManager; import org.apache.phoenix.memory.GlobalMemoryManager; +import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PMetaDataEntity; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.util.SizedUtil; @@ -57,17 +59,17 @@ public class GlobalCache extends TenantCacheImpl { // TODO: Use Guava cache with auto removal after lack of access private final ConcurrentMap<ImmutableBytesWritable,TenantCache> perTenantCacheMap = new ConcurrentHashMap<ImmutableBytesWritable,TenantCache>(); // Cache for lastest PTable for a given Phoenix table - private Cache<ImmutableBytesPtr,PTable> metaDataCache; + private Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache; public void clearTenantCache() { perTenantCacheMap.clear(); } - public Cache<ImmutableBytesPtr,PTable> getMetaDataCache() { + public Cache<ImmutableBytesPtr,PMetaDataEntity> getMetaDataCache() { // Lazy initialize QueryServices so that we only attempt to create an HBase Configuration // object upon the first attempt to connect to any cluster. Otherwise, an attempt will be // made at driver initialization time which is too early for some systems. - Cache<ImmutableBytesPtr,PTable> result = metaDataCache; + Cache<ImmutableBytesPtr,PMetaDataEntity> result = metaDataCache; if (result == null) { synchronized(this) { result = metaDataCache; @@ -82,9 +84,9 @@ public class GlobalCache extends TenantCacheImpl { metaDataCache = result = CacheBuilder.newBuilder() .maximumWeight(maxSize) .expireAfterAccess(maxTTL, TimeUnit.MILLISECONDS) - .weigher(new Weigher<ImmutableBytesPtr, PTable>() { + .weigher(new Weigher<ImmutableBytesPtr, PMetaDataEntity>() { @Override - public int weigh(ImmutableBytesPtr key, PTable table) { + public int weigh(ImmutableBytesPtr key, PMetaDataEntity table) { return SizedUtil.IMMUTABLE_BYTES_PTR_SIZE + key.getLength() + table.getEstimatedSize(); } }) @@ -157,4 +159,22 @@ public class GlobalCache extends TenantCacheImpl { } return tenantCache; } + + public static class FunctionBytesPtr extends ImmutableBytesPtr { + + public FunctionBytesPtr(byte[] key) { + super(key); + } + + @Override + public boolean equals(Object obj) { + if(obj instanceof FunctionBytesPtr) return super.equals(obj); + return false; + } + + @Override + public int hashCode() { + return super.hashCode(); + } + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/ColumnResolver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ColumnResolver.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ColumnResolver.java index 7bb210b..55253ea 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ColumnResolver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ColumnResolver.java @@ -20,6 +20,7 @@ package org.apache.phoenix.compile; import java.sql.SQLException; import java.util.List; +import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.schema.ColumnRef; import org.apache.phoenix.schema.TableRef; @@ -41,6 +42,11 @@ public interface ColumnResolver { public List<TableRef> getTables(); /** + * Returns the collection of resolved functions. + */ + public List<PFunction> getFunctions(); + + /** * Resolves table using name or alias. * @param schemaName the schema name * @param tableName the table name or table alias @@ -60,4 +66,15 @@ public interface ColumnResolver { * @throws AmbiguousColumnException if the column name is ambiguous */ public ColumnRef resolveColumn(String schemaName, String tableName, String colName) throws SQLException; + + /** + * Resolves function using functionName. + * @param functionName + * @return the resolved PFunction + * @throws ColumnNotFoundException if the column could not be resolved + * @throws AmbiguousColumnException if the column name is ambiguous + */ + public PFunction resolveFunction(String functionName) throws SQLException; + + public boolean hasUDFs(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateFunctionCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateFunctionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateFunctionCompiler.java new file mode 100644 index 0000000..2e3a873 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateFunctionCompiler.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.compile; + +import java.sql.ParameterMetaData; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.util.Collections; + +import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.parse.CreateFunctionStatement; +import org.apache.phoenix.schema.MetaDataClient; + +public class CreateFunctionCompiler { + + private final PhoenixStatement statement; + + public CreateFunctionCompiler(PhoenixStatement statement) { + this.statement = statement; + } + + public MutationPlan compile(final CreateFunctionStatement create) throws SQLException { + final PhoenixConnection connection = statement.getConnection(); + PhoenixConnection connectionToBe = connection; + final StatementContext context = new StatementContext(statement); + final MetaDataClient client = new MetaDataClient(connectionToBe); + + return new MutationPlan() { + + @Override + public ParameterMetaData getParameterMetaData() { + return context.getBindManager().getParameterMetaData(); + } + + @Override + public MutationState execute() throws SQLException { + try { + return client.createFunction(create); + } finally { + if (client.getConnection() != connection) { + client.getConnection().close(); + } + } + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + return new ExplainPlan(Collections.singletonList("CREATE FUNCTION")); + } + + @Override + public PhoenixConnection getConnection() { + return connection; + } + + @Override + public StatementContext getContext() { + return context; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java index 07d9f56..f1937a5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java @@ -46,7 +46,7 @@ public class CreateIndexCompiler { public MutationPlan compile(final CreateIndexStatement create) throws SQLException { final PhoenixConnection connection = statement.getConnection(); - final ColumnResolver resolver = FromCompiler.getResolver(create, connection); + final ColumnResolver resolver = FromCompiler.getResolver(create, connection, create.getUdfParseNodes()); Scan scan = new Scan(); final StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement)); ExpressionCompiler expressionCompiler = new ExpressionCompiler(context); http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index 4f6a719..575f0f3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -323,7 +323,7 @@ public class DeleteCompiler { hint, false, aliasedNodes, delete.getWhere(), Collections.<ParseNode>emptyList(), null, delete.getOrderBy(), delete.getLimit(), - delete.getBindCount(), false, false, Collections.<SelectStatement>emptyList()); + delete.getBindCount(), false, false, Collections.<SelectStatement>emptyList(), delete.getUdfParseNodes()); select = StatementNormalizer.normalize(select, resolver); SelectStatement transformedSelect = SubqueryRewriter.transform(select, resolver, connection); if (transformedSelect != select) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java index ab6b851..92899a6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.compile; + import java.math.BigDecimal; import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; @@ -70,6 +71,7 @@ import org.apache.phoenix.expression.function.ArrayAnyComparisonExpression; import org.apache.phoenix.expression.function.ArrayElemRefExpression; import org.apache.phoenix.expression.function.RoundDecimalExpression; import org.apache.phoenix.expression.function.RoundTimestampExpression; +import org.apache.phoenix.expression.function.UDFExpression; import org.apache.phoenix.parse.AddParseNode; import org.apache.phoenix.parse.AndParseNode; import org.apache.phoenix.parse.ArithmeticParseNode; @@ -95,12 +97,14 @@ import org.apache.phoenix.parse.ModulusParseNode; import org.apache.phoenix.parse.MultiplyParseNode; import org.apache.phoenix.parse.NotParseNode; import org.apache.phoenix.parse.OrParseNode; +import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.parse.ParseNode; import org.apache.phoenix.parse.RowValueConstructorParseNode; import org.apache.phoenix.parse.SequenceValueParseNode; import org.apache.phoenix.parse.StringConcatParseNode; import org.apache.phoenix.parse.SubqueryParseNode; import org.apache.phoenix.parse.SubtractParseNode; +import org.apache.phoenix.parse.UDFParseNode; import org.apache.phoenix.parse.UnsupportedAllParseNodeVisitor; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; @@ -313,8 +317,19 @@ public class ExpressionCompiler extends UnsupportedAllParseNodeVisitor<Expressio * @param children the child expression arguments to the function expression node. */ public Expression visitLeave(FunctionParseNode node, List<Expression> children) throws SQLException { + PFunction function = null; + if(node instanceof UDFParseNode) { + function = context.getResolver().resolveFunction(node.getName()); + BuiltInFunctionInfo info = new BuiltInFunctionInfo(function); + node = new UDFParseNode(node.getName(), node.getChildren(), info); + } children = node.validate(children, context); - Expression expression = node.create(children, context); + Expression expression = null; + if (function == null) { + expression = node.create(children, context); + } else { + expression = node.create(children, function, context); + } ImmutableBytesWritable ptr = context.getTempPtr(); BuiltInFunctionInfo info = node.getInfo(); for (int i = 0; i < info.getRequiredArgCount(); i++) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java index da78b24..5fe0e6f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java @@ -22,9 +22,11 @@ import java.sql.SQLFeatureNotSupportedException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HTableInterface; @@ -35,12 +37,15 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.parse.AliasedNode; import org.apache.phoenix.parse.BindTableNode; import org.apache.phoenix.parse.ColumnDef; +import org.apache.phoenix.parse.CreateFunctionStatement; import org.apache.phoenix.parse.CreateTableStatement; import org.apache.phoenix.parse.DMLStatement; import org.apache.phoenix.parse.DerivedTableNode; import org.apache.phoenix.parse.FamilyWildcardParseNode; import org.apache.phoenix.parse.JoinTableNode; +import org.apache.phoenix.parse.NamedNode; import org.apache.phoenix.parse.NamedTableNode; +import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.parse.ParseNode; import org.apache.phoenix.parse.ParseNodeFactory; import org.apache.phoenix.parse.SelectStatement; @@ -49,6 +54,7 @@ import org.apache.phoenix.parse.TableName; import org.apache.phoenix.parse.TableNode; import org.apache.phoenix.parse.TableNodeVisitor; import org.apache.phoenix.parse.TableWildcardParseNode; +import org.apache.phoenix.parse.UDFParseNode; import org.apache.phoenix.parse.WildcardParseNode; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.QueryConstants; @@ -57,6 +63,7 @@ import org.apache.phoenix.schema.AmbiguousTableException; import org.apache.phoenix.schema.ColumnFamilyNotFoundException; import org.apache.phoenix.schema.ColumnNotFoundException; import org.apache.phoenix.schema.ColumnRef; +import org.apache.phoenix.schema.FunctionNotFoundException; import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnFamily; @@ -103,6 +110,11 @@ public class FromCompiler { } @Override + public List<PFunction> getFunctions() { + return Collections.emptyList(); + } + + @Override public TableRef resolveTable(String schemaName, String tableName) throws SQLException { throw new UnsupportedOperationException(); @@ -112,6 +124,14 @@ public class FromCompiler { public ColumnRef resolveColumn(String schemaName, String tableName, String colName) throws SQLException { throw new UnsupportedOperationException(); } + + public PFunction resolveFunction(String functionName) throws SQLException { + throw new UnsupportedOperationException(); + }; + + public boolean hasUDFs() { + return false; + }; }; public static ColumnResolver getResolverForCreation(final CreateTableStatement statement, final PhoenixConnection connection) @@ -141,7 +161,7 @@ public class FromCompiler { if (htable != null) Closeables.closeQuietly(htable); } tableNode = NamedTableNode.create(null, baseTable, statement.getColumnDefs()); - return new SingleTableColumnResolver(connection, tableNode, e.getTimeStamp()); + return new SingleTableColumnResolver(connection, tableNode, e.getTimeStamp(), new HashMap<String, UDFParseNode>(1)); } throw e; } @@ -166,9 +186,9 @@ public class FromCompiler { if (fromNode == null) return EMPTY_TABLE_RESOLVER; if (fromNode instanceof NamedTableNode) - return new SingleTableColumnResolver(connection, (NamedTableNode) fromNode, true, 1); + return new SingleTableColumnResolver(connection, (NamedTableNode) fromNode, true, 1, statement.getUdfParseNodes()); - MultiTableColumnResolver visitor = new MultiTableColumnResolver(connection, 1); + MultiTableColumnResolver visitor = new MultiTableColumnResolver(connection, 1, statement.getUdfParseNodes()); fromNode.accept(visitor); return visitor; } @@ -178,12 +198,24 @@ public class FromCompiler { return visitor; } + public static ColumnResolver getResolver(NamedTableNode tableNode, PhoenixConnection connection, Map<String, UDFParseNode> udfParseNodes) throws SQLException { + SingleTableColumnResolver visitor = + new SingleTableColumnResolver(connection, tableNode, true, 0, udfParseNodes); + return visitor; + } + public static ColumnResolver getResolver(SingleTableStatement statement, PhoenixConnection connection) throws SQLException { SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, statement.getTable(), true); return visitor; } + public static ColumnResolver getResolver(SingleTableStatement statement, PhoenixConnection connection, Map<String, UDFParseNode> udfParseNodes) + throws SQLException { + SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, statement.getTable(), true, 0, udfParseNodes); + return visitor; + } + public static ColumnResolver getResolverForCompiledDerivedTable(PhoenixConnection connection, TableRef tableRef, RowProjector projector) throws SQLException { List<PColumn> projectedColumns = new ArrayList<PColumn>(); @@ -205,26 +237,32 @@ public class FromCompiler { return visitor; } + public static ColumnResolver getResolver(PhoenixConnection connection, TableRef tableRef, Map<String, UDFParseNode> udfParseNodes) + throws SQLException { + SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, tableRef, udfParseNodes); + return visitor; + } + public static ColumnResolver getResolverForMutation(DMLStatement statement, PhoenixConnection connection) throws SQLException { /* * We validate the meta data at commit time for mutations, as this allows us to do many UPSERT VALUES calls * without hitting the server each time to check if the meta data is up-to-date. */ - SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, statement.getTable(), false); + SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, statement.getTable(), false, 0, statement.getUdfParseNodes()); return visitor; } - public static ColumnResolver getResolverForProjectedTable(PTable projectedTable) { - return new ProjectedTableColumnResolver(projectedTable); + public static ColumnResolver getResolverForProjectedTable(PTable projectedTable, PhoenixConnection connection, Map<String, UDFParseNode> udfParseNodes) throws SQLException { + return new ProjectedTableColumnResolver(projectedTable, connection, udfParseNodes); } private static class SingleTableColumnResolver extends BaseColumnResolver { private final List<TableRef> tableRefs; private final String alias; - public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode table, long timeStamp) throws SQLException { - super(connection, 0); + public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode table, long timeStamp, Map<String, UDFParseNode> udfParseNodes) throws SQLException { + super(connection, 0, false, udfParseNodes); List<PColumnFamily> families = Lists.newArrayListWithExpectedSize(table.getDynamicColumns().size()); for (ColumnDef def : table.getDynamicColumns()) { if (def.getColumnDefName().getFamilyName() != null) { @@ -239,11 +277,13 @@ public class FromCompiler { } public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode tableNode, boolean updateCacheImmediately) throws SQLException { - this(connection, tableNode, updateCacheImmediately, 0); + this(connection, tableNode, updateCacheImmediately, 0, new HashMap<String,UDFParseNode>(1)); } - public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode tableNode, boolean updateCacheImmediately, int tsAddition) throws SQLException { - super(connection, tsAddition); + public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode tableNode, + boolean updateCacheImmediately, int tsAddition, + Map<String, UDFParseNode> udfParseNodes) throws SQLException { + super(connection, tsAddition, updateCacheImmediately, udfParseNodes); alias = tableNode.getAlias(); TableRef tableRef = createTableRef(tableNode, updateCacheImmediately); tableRefs = ImmutableList.of(tableRef); @@ -255,6 +295,12 @@ public class FromCompiler { tableRefs = ImmutableList.of(tableRef); } + public SingleTableColumnResolver(PhoenixConnection connection, TableRef tableRef, Map<String, UDFParseNode> udfParseNodes) throws SQLException { + super(connection, 0, false, udfParseNodes); + alias = tableRef.getTableAlias(); + tableRefs = ImmutableList.of(tableRef); + } + public SingleTableColumnResolver(TableRef tableRef) throws SQLException { super(null, 0); alias = tableRef.getTableAlias(); @@ -267,6 +313,11 @@ public class FromCompiler { } @Override + public List<PFunction> getFunctions() { + throw new UnsupportedOperationException(); + } + + @Override public TableRef resolveTable(String schemaName, String tableName) throws SQLException { TableRef tableRef = tableRefs.get(0); @@ -316,7 +367,6 @@ public class FromCompiler { : tableRef.getTable().getColumn(colName); return new ColumnRef(tableRef, column.getPosition()); } - } private static abstract class BaseColumnResolver implements ColumnResolver { @@ -326,11 +376,30 @@ public class FromCompiler { // on Windows because the millis timestamp granularity is so bad we sometimes won't // get the data back that we just upsert. private final int tsAddition; + protected final Map<String, PFunction> functionMap; + protected List<PFunction> functions; private BaseColumnResolver(PhoenixConnection connection, int tsAddition) { + this.connection = connection; + this.client = connection == null ? null : new MetaDataClient(connection); + this.tsAddition = tsAddition; + functionMap = new HashMap<String, PFunction>(1); + this.functions = Collections.<PFunction>emptyList(); + } + + private BaseColumnResolver(PhoenixConnection connection, int tsAddition, boolean updateCacheImmediately, Map<String, UDFParseNode> udfParseNodes) throws SQLException { this.connection = connection; this.client = connection == null ? null : new MetaDataClient(connection); this.tsAddition = tsAddition; + functionMap = new HashMap<String, PFunction>(1); + if (udfParseNodes.isEmpty()) { + functions = Collections.<PFunction> emptyList(); + } else { + functions = createFunctionRef(new ArrayList<String>(udfParseNodes.keySet()), updateCacheImmediately); + for (PFunction function : functions) { + functionMap.put(function.getFunctionName(), function); + } + } } protected TableRef createTableRef(NamedTableNode tableNode, boolean updateCacheImmediately) throws SQLException { @@ -383,6 +452,85 @@ public class FromCompiler { return tableRef; } + @Override + public List<PFunction> getFunctions() { + return functions; + } + + private List<PFunction> createFunctionRef(List<String> functionNames, boolean updateCacheImmediately) throws SQLException { + long timeStamp = QueryConstants.UNSET_TIMESTAMP; + int numFunctions = functionNames.size(); + List<PFunction> functionsFound = new ArrayList<PFunction>(functionNames.size()); + if (updateCacheImmediately || connection.getAutoCommit()) { + getFunctionFromCache(functionNames, functionsFound, true); + if(functionNames.isEmpty()) { + return functionsFound; + } + MetaDataMutationResult result = client.updateCache(functionNames); + timeStamp = result.getMutationTime(); + functionsFound = result.getFunctions(); + if(functionNames.size() != functionsFound.size()){ + throw new FunctionNotFoundException("Some of the functions in "+functionNames.toString()+" are not found"); + } + } else { + getFunctionFromCache(functionNames, functionsFound, false); + // We always attempt to update the cache in the event of a FunctionNotFoundException + MetaDataMutationResult result = null; + if (!functionNames.isEmpty()) { + result = client.updateCache(functionNames); + } + if(result!=null) { + if (!result.getFunctions().isEmpty()) { + functionsFound.addAll(result.getFunctions()); + } + if(result.wasUpdated()) { + timeStamp = result.getMutationTime(); + } + } + if (functionsFound.size()!=numFunctions) { + throw new FunctionNotFoundException("Some of the functions in "+functionNames.toString()+" are not found", timeStamp); + } + } + if (timeStamp != QueryConstants.UNSET_TIMESTAMP) { + timeStamp += tsAddition; + } + + if (logger.isDebugEnabled() && timeStamp != QueryConstants.UNSET_TIMESTAMP) { + logger.debug(LogUtil.addCustomAnnotations("Re-resolved stale function " + functionNames.toString() + "at timestamp " + timeStamp, connection)); + } + return functionsFound; + } + + private void getFunctionFromCache(List<String> functionNames, + List<PFunction> functionsFound, boolean getOnlyTemporyFunctions) { + Iterator<String> iterator = functionNames.iterator(); + while(iterator.hasNext()) { + PFunction function = null; + String functionName = iterator.next(); + try { + function = connection.getMetaDataCache().getFunction(new PTableKey(connection.getTenantId(), functionName)); + } catch (FunctionNotFoundException e1) { + if (connection.getTenantId() != null) { // Check with null tenantId next + try { + function = connection.getMetaDataCache().getFunction(new PTableKey(null, functionName)); + } catch (FunctionNotFoundException e2) { + } + } + } + if (function != null) { + if (getOnlyTemporyFunctions) { + if (function.isTemporaryFunction()) { + functionsFound.add(function); + iterator.remove(); + } + } else { + functionsFound.add(function); + iterator.remove(); + } + } + } + } + protected PTable addDynamicColumns(List<ColumnDef> dynColumns, PTable theTable) throws SQLException { if (!dynColumns.isEmpty()) { @@ -409,6 +557,20 @@ public class FromCompiler { } return theTable; } + + @Override + public PFunction resolveFunction(String functionName) throws SQLException { + PFunction function = functionMap.get(functionName); + if(function == null) { + throw new FunctionNotFoundException(functionName); + } + return function; + } + + @Override + public boolean hasUDFs() { + return !functions.isEmpty(); + } } private static class MultiTableColumnResolver extends BaseColumnResolver implements TableNodeVisitor<Void> { @@ -421,6 +583,12 @@ public class FromCompiler { tables = Lists.newArrayList(); } + private MultiTableColumnResolver(PhoenixConnection connection, int tsAddition, Map<String, UDFParseNode> udfParseNodes) throws SQLException { + super(connection, tsAddition, false, udfParseNodes); + tableMap = ArrayListMultimap.<String, TableRef> create(); + tables = Lists.newArrayList(); + } + @Override public List<TableRef> getTables() { return tables; @@ -580,16 +748,14 @@ public class FromCompiler { } } } - } private static class ProjectedTableColumnResolver extends MultiTableColumnResolver { private final boolean isLocalIndex; private final List<TableRef> theTableRefs; private final Map<ColumnRef, Integer> columnRefMap; - - private ProjectedTableColumnResolver(PTable projectedTable) { - super(null, 0); + private ProjectedTableColumnResolver(PTable projectedTable, PhoenixConnection conn, Map<String, UDFParseNode> udfParseNodes) throws SQLException { + super(conn, 0, udfParseNodes); Preconditions.checkArgument(projectedTable.getType() == PTableType.PROJECTED); this.isLocalIndex = projectedTable.getIndexType() == IndexType.LOCAL; this.columnRefMap = new HashMap<ColumnRef, Integer>(); @@ -615,6 +781,7 @@ public class FromCompiler { this.columnRefMap.put(new ColumnRef(tableRef, colRef.getColumnPosition()), column.getPosition()); } this.theTableRefs = ImmutableList.of(new TableRef(ParseNodeFactory.createTempAlias(), projectedTable, ts, false)); + } @Override