Repository: incubator-hawq Updated Branches: refs/heads/master 9559ba1fe -> 7a22bdd78
HAWQ-1108. JDBC PXF Plugin Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/7a22bdd7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/7a22bdd7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/7a22bdd7 Branch: refs/heads/master Commit: 7a22bdd78f78de85c4d6569c49f60868264d0398 Parents: 9559ba1 Author: Devin Jia <[email protected]> Authored: Tue Mar 14 14:14:11 2017 -0700 Committer: shivzone <[email protected]> Committed: Tue Mar 14 14:14:11 2017 -0700 ---------------------------------------------------------------------- pxf/build.gradle | 32 ++ pxf/pxf-jdbc/README.md | 141 +++++++++ .../pxf/plugins/jdbc/JdbcFilterBuilder.java | 149 +++++++++ .../plugins/jdbc/JdbcPartitionFragmenter.java | 308 +++++++++++++++++++ .../hawq/pxf/plugins/jdbc/JdbcPlugin.java | 115 +++++++ .../hawq/pxf/plugins/jdbc/JdbcReadAccessor.java | 122 ++++++++ .../hawq/pxf/plugins/jdbc/JdbcReadResolver.java | 103 +++++++ .../hawq/pxf/plugins/jdbc/WhereSQLBuilder.java | 140 +++++++++ .../hawq/pxf/plugins/jdbc/utils/ByteUtil.java | 84 +++++ .../hawq/pxf/plugins/jdbc/utils/DbProduct.java | 49 +++ .../pxf/plugins/jdbc/utils/MysqlProduct.java | 31 ++ .../pxf/plugins/jdbc/utils/OracleProduct.java | 30 ++ .../pxf/plugins/jdbc/utils/PostgresProduct.java | 30 ++ .../pxf/plugins/jdbc/JdbcFilterBuilderTest.java | 101 ++++++ .../jdbc/JdbcPartitionFragmenterTest.java | 235 ++++++++++++++ .../hawq/pxf/plugins/jdbc/SqlBuilderTest.java | 175 +++++++++++ .../src/main/resources/pxf-profiles-default.xml | 9 + pxf/settings.gradle | 3 +- 18 files changed, 1856 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/build.gradle ---------------------------------------------------------------------- diff --git a/pxf/build.gradle b/pxf/build.gradle index d604b96..e9d04af 100644 --- a/pxf/build.gradle +++ b/pxf/build.gradle @@ -437,6 +437,38 @@ project('pxf-hbase') { } } + +project('pxf-jdbc') { + dependencies { + compile(project(':pxf-api')) + compile(project(':pxf-service')) + compile "org.apache.hadoop:hadoop-common:$hadoopVersion" + compile "org.apache.hadoop:hadoop-hdfs:$hadoopVersion" + testCompile "mysql:mysql-connector-java:5.1.6" + } + tasks.withType(JavaCompile) { + options.encoding = "UTF-8" + } + + ospackage { + packageName = versionedPackageName("${project.name}") + summary = 'HAWQ Extension Framework (PXF), JDBC plugin' + description = 'Querying external data stored in Relation Database using JDBC.' + packager = ' ' + packageGroup = 'Development/Libraries' + release = buildNumber() + '.' + project.osFamily + buildHost = ' ' + + requires(versionedPackageName('pxf-service'), project.version, GREATER | EQUAL) + + from(jar.outputs.files) { + into "/usr/lib/pxf-${project.version}" + } + + link("/usr/lib/pxf-${project.version}/${project.name}.jar", "${project.name}-${project.version}.jar") + } +} + def buildNumber() { System.getenv('BUILD_NUMBER') ?: System.getProperty('user.name') } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/pxf-jdbc/README.md ---------------------------------------------------------------------- diff --git a/pxf/pxf-jdbc/README.md b/pxf/pxf-jdbc/README.md new file mode 100644 index 0000000..e8c4bc0 --- /dev/null +++ b/pxf/pxf-jdbc/README.md @@ -0,0 +1,141 @@ +# Accessing Jdbc Table Data + +The PXF JDBC plug-in reads data stored in Traditional relational database,ie : mysql,ORACLE,postgresql. + +PXF-JDBC plug-in is the client of the database, the host running the database engine does not need to +deploy PXF. + + +# Prerequisites + +Check the following before using PXF to access JDBC Table: +* The PXF JDBC plug-in is installed on all cluster nodes. +* The JDBC JAR files are installed on all cluster nodes, and added to file - 'pxf-public.classpath' +* You have tested PXF on HDFS. + +# Using PXF Tables to Query JDBC Table +Jdbc tables are defined in same schema in PXF.The PXF table has the same column name +as Jdbc Table, and the column type requires a mapping of Jdbc-HAWQ. + +## Syntax Example +The following PXF table definition is valid for Jdbc Table. + + CREATE [READABLE|WRITABLE] EXTERNAL TABLE table_name + ( column_name data_type [, ...] | LIKE other_table ) + LOCATION ('pxf://namenode[:port]/jdbc-schema-name.jdbc-table-name?<pxf-parameters><&custom-parameters>') + FORMAT 'CUSTOM' (formatter='pxfwritable_import') +If `jdbc-schema-name` is omitted, pxf will default to the `default` schema. + +The `column_name` must exists in jdbc-table,`data_type` equals or similar to +the jdbc-column type. + +where `<pxf-parameters>` is: + + [FRAGMENTER=org.apache.hawq.pxf.plugins.jdbc.JdbcPartitionFragmenter + &ACCESSOR=org.apache.hawq.pxf.plugins.jdbc.JdbcReadAccessor + &RESOLVER=org.apache.hawq.pxf.plugins.jdbc.JdbcReadResolver] + | PROFILE=Jdbc + +where `<custom-parameters>` is: + + JDBC_DRIVER=<jdbc-driver-class-name> + &DB_URL=<jdbc-url>&USER=<database-user>&PASS=<password> + +## Jdbc Table to HAWQ Data Type Mapping +Jdbc-table and hawq-table data type system is similar to, does not require +a special type of mapping. +# Usage +The following to mysql, for example, describes the use of PDF-JDBC. + +To query MySQL Table in HAWQ, perform the following steps: +1. create Table in MySQL + + mysql> use demodb; + mysql> create table myclass( + id int(4) not null primary key, + name varchar(20) not null, + gender int(4) not null default '0', + degree double(16,2));` +2. insert test data + + insert into myclass values(1,"tom",1,90); + insert into myclass values(2,'john',0,94); + insert into myclass values(3,'simon',1,79); +3. copy mysql-jdbc jar files to `/usr/lib/pxf` (on all cluster nodes), and +edit `/etc/pxf/conf/pxf-public.classpath` , add : + + /usr/lib/pxf/mysql-connector-java-*.jar + + Restart all pxf-engine. + +4. create Table in HAWQ: + + gpadmin=# CREATE EXTERNAL TABLE myclass(id integer, + name text, + gender integer, + degree float8) + LOCATION ('pxf://localhost:51200/demodb.myclass' + '?PROFILE=JDBC' + '&JDBC_DRIVER=com.mysql.jdbc.Driver' + '&DB_URL=jdbc:mysql://192.168.200.6:3306/demodb&USER=root&PASS=root' + ) + FORMAT 'CUSTOM' (Formatter='pxfwritable_import'); + +MySQL instance IP: 192.168.200.6, port: 3306. + +5. query mysql data in HAWQ: + + gpadmin=# select * from myclass; + gpadmin=# select * from myclass where id=2; + +# Jdbc Table Fragments +## intro +PXF-JDBC plug-in as a client to access jdbc database.By default, there is +only one pxf-instance connectied JDBC Table.If the jdbc table data is large, +you can also use multiple pxf-instance to access the JDBC table by fragments. + +## Syntax +where `<custom-parameters>` can use following partition parameters: + + PARTITION_BY=column_name:column_type&RANGE=start_value[:end_value]&INTERVAL=interval_num[:interval_unit] +The `PARTITION_BY` parameter indicates which column to use as the partition column. +It can be split by colon(':'),the `column_type` current supported : `date|int|enum` . +The Date format is `yyyy-MM-dd`. +The `PARTITION_BY` parameter can be null, and there will be only one fragment. + +The `RANGE` parameter indicates the range of data to be queried , it can be split by colon(':'). + The range is left-closed, ie: `>= start_value AND < end_value` . + If the `column_type` is `int`, the `end_value` can be empty. + If the `column_type` is `enum`,the parameter `RANGE` can be empty. + +The `INTERVAL` parameter can be split by colon(':'), indicate the interval + value of one fragment. When `column_type` is `date`,this parameter must + be split by colon, and `interval_unit` can be `year|month|day`. When + `column_type` is int, the `interval_unit` can be empty. When `column_type` + is enum,the `INTERVAL` parameter can be empty. + +The syntax examples is : + + * PARTITION_BY=createdate:date&RANGE=2008-01-01:2010-01-01&INTERVAL=1:month' + * PARTITION_BY=year:int&RANGE=2008:2010&INTERVAL=1 + * PARTITION_BY=grade:enum&RANGE=excellent:good:general:bad + +## Usage +MySQL Table: + + CREATE TABLE sales (id int primary key, cdate date, amt decimal(10,2),grade varchar(30)) +HAWQ Table: + + CREATE EXTERNAL TABLE sales(id integer, + cdate date, + amt float8, + grade text) + LOCATION ('pxf://localhost:51200/sales' + '?PROFILE=JDBC' + '&JDBC_DRIVER=com.mysql.jdbc.Driver' + '&DB_URL=jdbc:mysql://192.168.200.6:3306/demodb&USER=root&PASS=root' + '&PARTITION_BY=cdate:date&RANGE=2008-01-01:2010-01-01&INTERVAL=1:year' + ) + FORMAT 'CUSTOM' (Formatter='pxfwritable_import'); +At PXF-JDBC plugin,this will generate 2 fragments.Then HAWQ assign these fragments to 2 PXF-instance +to access jdbc table data. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcFilterBuilder.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcFilterBuilder.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcFilterBuilder.java new file mode 100644 index 0000000..3c56ccb --- /dev/null +++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcFilterBuilder.java @@ -0,0 +1,149 @@ +package org.apache.hawq.pxf.plugins.jdbc; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hawq.pxf.api.BasicFilter; +import org.apache.hawq.pxf.api.FilterParser; +import org.apache.hawq.pxf.api.LogicalFilter; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; + +/** + * Uses the filter parser code to build a filter object, either simple - a + * single {@link BasicFilter} object or a + * compound - a {@link List} of + * {@link BasicFilter} objects. + * The subclass {@link WhereSQLBuilder} will use the filter for + * generate WHERE statement. + */ +public class JdbcFilterBuilder implements FilterParser.FilterBuilder { + /** + * Translates a filterString into a {@link BasicFilter} or a + * list of such filters. + * + * @param filterString the string representation of the filter + * @return a single {@link BasicFilter} + * object or a {@link List} of + * {@link BasicFilter} objects. + * @throws Exception if parsing the filter failed or filter is not a basic + * filter or list of basic filters + */ + public Object getFilterObject(String filterString) throws Exception { + FilterParser parser = new FilterParser(this); + Object result = parser.parse(filterString.getBytes(FilterParser.DEFAULT_CHARSET)); + + if (!(result instanceof LogicalFilter) && !(result instanceof BasicFilter) + && !(result instanceof List)) { + throw new Exception("String " + filterString + + " resolved to no filter"); + } + + return result; + } + + + @Override + public Object build(FilterParser.LogicalOperation op, Object leftOperand, Object rightOperand) { + return handleLogicalOperation(op, leftOperand, rightOperand); + } + + @Override + public Object build(FilterParser.LogicalOperation op, Object filter) { + return handleLogicalOperation(op, filter); + } + + @Override + @SuppressWarnings("unchecked") + public Object build(FilterParser.Operation opId, Object leftOperand, + Object rightOperand) throws Exception { + // Assume column is on the left + return handleSimpleOperations(opId, + (FilterParser.ColumnIndex) leftOperand, + (FilterParser.Constant) rightOperand); + } + + @Override + public Object build(FilterParser.Operation operation, Object operand) throws Exception { + if (operation == FilterParser.Operation.HDOP_IS_NULL || operation == FilterParser.Operation.HDOP_IS_NOT_NULL) { + // use null for the constant value of null comparison + return handleSimpleOperations(operation, (FilterParser.ColumnIndex) operand, null); + } else { + throw new Exception("Unsupported unary operation " + operation); + } + } + + /* + * Handles simple column-operator-constant expressions Creates a special + * filter in the case the column is the row key column + */ + private BasicFilter handleSimpleOperations(FilterParser.Operation opId, + FilterParser.ColumnIndex column, + FilterParser.Constant constant) { + return new BasicFilter(opId, column, constant); + } + + /** + * Handles AND of already calculated expressions. Currently only AND, in the + * future OR can be added + * + * Four cases here: + * <ol> + * <li>both are simple filters</li> + * <li>left is a FilterList and right is a filter</li> + * <li>left is a filter and right is a FilterList</li> + * <li>both are FilterLists</li> + * </ol> + * Currently, 1, 2 can occur, since no parenthesis are used + * + * @param left left hand filter + * @param right right hand filter + * @return list of filters constructing the filter tree + */ + private List<BasicFilter> handleCompoundOperations(List<BasicFilter> left, + BasicFilter right) { + left.add(right); + return left; + } + + private List<BasicFilter> handleCompoundOperations(BasicFilter left, + BasicFilter right) { + List<BasicFilter> result = new LinkedList<>(); + + result.add(left); + result.add(right); + + return result; + } + + private Object handleLogicalOperation(FilterParser.LogicalOperation operator, Object leftOperand, Object rightOperand) { + + List<Object> result = new LinkedList<>(); + + result.add(leftOperand); + result.add(rightOperand); + return new LogicalFilter(operator, result); + } + + private Object handleLogicalOperation(FilterParser.LogicalOperation operator, Object filter) { + return new LogicalFilter(operator, Arrays.asList(filter)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenter.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenter.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenter.java new file mode 100644 index 0000000..8b5886d --- /dev/null +++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenter.java @@ -0,0 +1,308 @@ +package org.apache.hawq.pxf.plugins.jdbc; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hawq.pxf.api.Fragmenter; +import org.apache.hawq.pxf.api.FragmentsStats; +import org.apache.hawq.pxf.api.UserDataException; +import org.apache.hawq.pxf.plugins.jdbc.utils.DbProduct; +import org.apache.hawq.pxf.plugins.jdbc.utils.ByteUtil; +import org.apache.hawq.pxf.api.Fragment; +import org.apache.hawq.pxf.api.utilities.InputData; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.*; + +/** + * Fragmenter class for JDBC data resources. + * + * Extends the {@link Fragmenter} abstract class, with the purpose of transforming + * an input data path (an JDBC Database table name and user request parameters) into a list of regions + * that belong to this table. + * <br> + * The parameter Patterns<br> + * There are three parameters, the format is as follows:<br> + * <pre> + * <code>PARTITION_BY=column_name:column_type&RANGE=start_value[:end_value]&INTERVAL=interval_num[:interval_unit]</code> + * </pre> + * The <code>PARTITION_BY</code> parameter can be split by colon(':'),the <code>column_type</code> current supported : <code>date,int,enum</code> . + * The Date format is 'yyyy-MM-dd'. <br> + * The <code>RANGE</code> parameter can be split by colon(':') ,used to identify the starting range of each fragment. + * The range is left-closed, ie:<code> '>= start_value AND < end_value' </code>.If the <code>column_type</code> is <code>int</code>, + * the <code>end_value</code> can be empty. If the <code>column_type</code>is <code>enum</code>,the parameter <code>RANGE</code> can be empty. <br> + * The <code>INTERVAL</code> parameter can be split by colon(':'), indicate the interval value of one fragment. + * When <code>column_type</code> is <code>date</code>,this parameter must be split by colon, and <code>interval_unit</code> can be <code>year,month,day</code>. + * When <code>column_type</code> is <code>int</code>, the <code>interval_unit</code> can be empty. + * When <code>column_type</code> is <code>enum</code>,the <code>INTERVAL</code> parameter can be empty. + * <br> + * <p> + * The syntax examples is :<br> + * <code>PARTITION_BY=createdate:date&RANGE=2008-01-01:2010-01-01&INTERVAL=1:month'</code> <br> + * <code>PARTITION_BY=year:int&RANGE=2008:2010&INTERVAL=1</code> <br> + * <code>PARTITION_BY=grade:enum&RANGE=excellent:good:general:bad</code> + * </p> + * + */ +public class JdbcPartitionFragmenter extends Fragmenter { + String[] partitionBy = null; + String[] range = null; + String[] interval = null; + PartitionType partitionType = null; + String partitionColumn = null; + IntervalType intervalType = null; + int intervalNum = 1; + + //when partitionType is DATE,it is valid + Calendar rangeStart = null; + Calendar rangeEnd = null; + + + enum PartitionType { + DATE, + INT, + ENUM; + + public static PartitionType getType(String str) { + return valueOf(str.toUpperCase()); + } + } + + enum IntervalType { + DAY, + MONTH, + YEAR; + + public static IntervalType type(String str) { + return valueOf(str.toUpperCase()); + } + } + + /** + * Constructor for JdbcPartitionFragmenter. + * + * @param inConf input data such as which Jdbc table to scan + * @throws UserDataException if the request parameter is malformed + */ + public JdbcPartitionFragmenter(InputData inConf) throws UserDataException { + super(inConf); + if (inConf.getUserProperty("PARTITION_BY") == null) + return; + try { + partitionBy = inConf.getUserProperty("PARTITION_BY").split(":"); + partitionColumn = partitionBy[0]; + partitionType = PartitionType.getType(partitionBy[1]); + } catch (IllegalArgumentException e1) { + throw new UserDataException("The parameter 'PARTITION_BY' invalid, the pattern is 'column_name:date|int|enum'"); + } + + try { + range = inConf.getUserProperty("RANGE").split(":"); + } catch (IllegalArgumentException e1) { + throw new UserDataException("The parameter 'RANGE' invalid, the pattern is 'start_value[:end_value]'"); + } + try { + //parse and validate parameter-INTERVAL + if (inConf.getUserProperty("INTERVAL") != null) { + interval = inConf.getUserProperty("INTERVAL").split(":"); + intervalNum = Integer.parseInt(interval[0]); + if (interval.length > 1) + intervalType = IntervalType.type(interval[1]); + } + if (intervalNum < 1) + throw new UserDataException("The parameter 'INTERVAL' must > 1, but actual is '" + intervalNum + "'"); + } catch (IllegalArgumentException e1) { + throw new UserDataException("The parameter 'INTERVAL' invalid, the pattern is 'interval_num[:interval_unit]'"); + } catch (UserDataException e2) { + throw e2; + } + try { + if (partitionType == PartitionType.DATE) { + SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd"); + rangeStart = Calendar.getInstance(); + rangeStart.setTime(df.parse(range[0])); + rangeEnd = Calendar.getInstance(); + rangeEnd.setTime(df.parse(range[1])); + } + } catch (ParseException e) { + throw new UserDataException("The parameter 'RANGE' include invalid date format."); + } + } + + /** + * Returns statistics for Jdbc table. Currently it's not implemented. + * @throws UnsupportedOperationException ANALYZE for Jdbc plugin is not supported + */ + @Override + public FragmentsStats getFragmentsStats() throws UnsupportedOperationException { + throw new UnsupportedOperationException("ANALYZE for Jdbc plugin is not supported"); + } + + /** + * Returns list of fragments containing all of the + * Jdbc table data. + * + * @return a list of fragments + * @throws Exception if assign host error + */ + @Override + public List<Fragment> getFragments() throws Exception { + if (partitionType == null) { + byte[] fragmentMetadata = null; + byte[] userData = null; + Fragment fragment = new Fragment(inputData.getDataSource(), null, fragmentMetadata, userData); + fragments.add(fragment); + return prepareHosts(fragments); + } + switch (partitionType) { + case DATE: { + int currInterval = intervalNum; + + Calendar fragStart = rangeStart; + while (fragStart.before(rangeEnd)) { + Calendar fragEnd = (Calendar) fragStart.clone(); + switch (intervalType) { + case DAY: + fragEnd.add(Calendar.DAY_OF_MONTH, currInterval); + break; + case MONTH: + fragEnd.add(Calendar.MONTH, currInterval); + break; + case YEAR: + fragEnd.add(Calendar.YEAR, currInterval); + break; + } + if (fragEnd.after(rangeEnd)) + fragEnd = (Calendar) rangeEnd.clone(); + + //make metadata of this fragment , converts the date to a millisecond,then get bytes. + byte[] msStart = ByteUtil.getBytes(fragStart.getTimeInMillis()); + byte[] msEnd = ByteUtil.getBytes(fragEnd.getTimeInMillis()); + byte[] fragmentMetadata = ByteUtil.mergeBytes(msStart, msEnd); + + byte[] userData = new byte[0]; + Fragment fragment = new Fragment(inputData.getDataSource(), null, fragmentMetadata, userData); + fragments.add(fragment); + + //continue next fragment. + fragStart = fragEnd; + } + break; + } + case INT: { + int rangeStart = Integer.parseInt(range[0]); + int rangeEnd = Integer.parseInt(range[1]); + int currInterval = intervalNum; + + //validate : curr_interval > 0 + int fragStart = rangeStart; + while (fragStart < rangeEnd) { + int fragEnd = fragStart + currInterval; + if (fragEnd > rangeEnd) fragEnd = rangeEnd; + + byte[] bStart = ByteUtil.getBytes(fragStart); + byte[] bEnd = ByteUtil.getBytes(fragEnd); + byte[] fragmentMetadata = ByteUtil.mergeBytes(bStart, bEnd); + + byte[] userData = new byte[0]; + Fragment fragment = new Fragment(inputData.getDataSource(), null, fragmentMetadata, userData); + fragments.add(fragment); + + //continue next fragment. + fragStart = fragEnd;// + 1; + } + break; + } + case ENUM: + for (String frag : range) { + byte[] fragmentMetadata = frag.getBytes(); + Fragment fragment = new Fragment(inputData.getDataSource(), null, fragmentMetadata, new byte[0]); + fragments.add(fragment); + } + break; + } + + return prepareHosts(fragments); + } + + /** + * For each fragment , assigned a host address. + * In Jdbc Plugin, 'replicas' is the host address of the PXF engine that is running, not the database engine. + * Since the other PXF host addresses can not be probed, only the host name of the current PXF engine is returned. + * @param fragments a list of fragments + * @return a list of fragments that assigned hosts. + * @throws UnknownHostException if InetAddress.getLocalHost error. + */ + public static List<Fragment> prepareHosts(List<Fragment> fragments) throws UnknownHostException { + for (Fragment fragment : fragments) { + String pxfHost = InetAddress.getLocalHost().getHostAddress(); + String[] hosts = new String[]{pxfHost}; + fragment.setReplicas(hosts); + } + + return fragments; + } + + public String buildFragmenterSql(String dbName, String originSql) { + byte[] meta = inputData.getFragmentMetadata(); + if (meta == null) + return originSql; + + DbProduct dbProduct = DbProduct.getDbProduct(dbName); + + StringBuilder sb = new StringBuilder(originSql); + if (!originSql.contains("WHERE")) + sb.append(" WHERE 1=1 "); + + sb.append(" AND "); + switch (partitionType) { + case DATE: { + SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd"); + //parse metadata of this fragment + //validateï¼the length of metadata == 16 (long) + byte[][] newb = ByteUtil.splitBytes(meta, 8); + Date fragStart = new Date(ByteUtil.toLong(newb[0])); + Date fragEnd = new Date(ByteUtil.toLong(newb[1])); + + sb.append(partitionColumn).append(" >= ").append(dbProduct.wrapDate(df.format(fragStart))); + sb.append(" AND "); + sb.append(partitionColumn).append(" < ").append(dbProduct.wrapDate(df.format(fragEnd))); + + break; + } + case INT: { + //validateï¼the length of metadata ==8 ï¼int) + byte[][] newb = ByteUtil.splitBytes(meta, 4); + int fragStart = ByteUtil.toInt(newb[0]); + int fragEnd = ByteUtil.toInt(newb[1]); + sb.append(partitionColumn).append(" >= ").append(fragStart); + sb.append(" AND "); + sb.append(partitionColumn).append(" < ").append(fragEnd); + break; + } + case ENUM: + sb.append(partitionColumn).append("='").append(new String(meta)).append("'"); + break; + } + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPlugin.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPlugin.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPlugin.java new file mode 100644 index 0000000..53848f1 --- /dev/null +++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPlugin.java @@ -0,0 +1,115 @@ +package org.apache.hawq.pxf.plugins.jdbc; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hawq.pxf.api.UserDataException; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.api.utilities.Plugin; + +import java.sql.*; + +/** + * This class resolves the jdbc connection parameter and manages the opening and closing of the jdbc connection. + * Implemented subclasses: {@link JdbcReadAccessor}. + * + */ +public class JdbcPlugin extends Plugin { + private static final Log LOG = LogFactory.getLog(JdbcPlugin.class); + + //jdbc connection parameters + protected String jdbcDriver = null; + protected String dbUrl = null; + protected String user = null; + protected String pass = null; + protected String tblName = null; + protected int batchSize = 100; + + //jdbc connection + protected Connection dbConn = null; + //database typeï¼from DatabaseMetaData.getDatabaseProductName() + protected String dbProduct = null; + + /** + * parse input data + * + * @param input the input data + * @throws UserDataException if the request parameter is malformed + */ + public JdbcPlugin(InputData input) throws UserDataException { + super(input); + jdbcDriver = input.getUserProperty("JDBC_DRIVER"); + dbUrl = input.getUserProperty("DB_URL"); + user = input.getUserProperty("USER"); + pass = input.getUserProperty("PASS"); + String strBatch = input.getUserProperty("BATCH_SIZE"); + if (strBatch != null) { + batchSize = Integer.parseInt(strBatch); + } + + if (jdbcDriver == null) { + throw new UserDataException("JDBC_DRIVER must be set"); + } + if (dbUrl == null) { + throw new UserDataException("DB_URL must be set(read)"); + } + + tblName = input.getDataSource(); + if (tblName == null) { + throw new UserDataException("TABLE_NAME must be set as DataSource."); + } else { + tblName = tblName.toUpperCase(); + } + } + + public String getTableName() { + return tblName; + } + + protected Connection openConnection() throws ClassNotFoundException, SQLException { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Open JDBC: driver=%s,url=%s,user=%s,pass=%s,table=%s", + jdbcDriver, dbUrl, user, pass, tblName)); + } + if (dbConn == null || dbConn.isClosed()) { + Class.forName(jdbcDriver); + if (user != null) { + dbConn = DriverManager.getConnection(dbUrl, user, pass); + } else { + dbConn = DriverManager.getConnection(dbUrl); + } + DatabaseMetaData meta = dbConn.getMetaData(); + dbProduct = meta.getDatabaseProductName(); + } + return dbConn; + } + + protected void closeConnection() { + try { + if (dbConn != null) { + dbConn.close(); + dbConn = null; + } + } catch (SQLException e) { + LOG.error("Close db connection error . ", e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadAccessor.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadAccessor.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadAccessor.java new file mode 100644 index 0000000..2ca9a94 --- /dev/null +++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadAccessor.java @@ -0,0 +1,122 @@ +package org.apache.hawq.pxf.plugins.jdbc; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.ReadAccessor; +import org.apache.hawq.pxf.api.UserDataException; +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.sql.*; +import java.util.ArrayList; + +/** + * Accessor for Jdbc tables. The accessor will open and read a partition belonging + * to a Jdbc table. JdbcReadAccessor generates and executes SQL from filter and + * fragmented information, uses {@link JdbcReadResolver } to read the ResultSet, and generates + * the data type - List {@link OneRow} that HAWQ needs. + */ +public class JdbcReadAccessor extends JdbcPlugin implements ReadAccessor { + private static final Log LOG = LogFactory.getLog(JdbcReadAccessor.class); + + WhereSQLBuilder filterBuilder = null; + private ColumnDescriptor keyColumn = null; + + private String querySql = null; + private Statement statement = null; + private ResultSet resultSet = null; + + public JdbcReadAccessor(InputData input) throws UserDataException { + super(input); + filterBuilder = new WhereSQLBuilder(inputData); + + //buid select statement (not contain where statement) + ArrayList<ColumnDescriptor> columns = input.getTupleDescription(); + StringBuilder sb = new StringBuilder(); + sb.append("SELECT "); + for (int i = 0; i < columns.size(); i++) { + ColumnDescriptor column = columns.get(i); + if (column.isKeyColumn()) + keyColumn = column; + if (i > 0) sb.append(","); + sb.append(column.columnName()); + } + sb.append(" FROM ").append(getTableName()); + querySql = sb.toString(); + } + + /** + * open db connection, execute query sql + */ + @Override + public boolean openForRead() throws Exception { + if (statement != null && !statement.isClosed()) + return true; + super.openConnection(); + + statement = dbConn.createStatement(); + + resultSet = executeQuery(querySql); + + return true; + } + + public ResultSet executeQuery(String sql) throws Exception { + String query = sql; + if (inputData.hasFilter()) { + //parse filter string , build where statement + String whereSql = filterBuilder.buildWhereSQL(dbProduct); + + if (whereSql != null) { + query = query + " WHERE " + whereSql; + } + } + + //according to the fragment information, rewriting sql + JdbcPartitionFragmenter fragmenter = new JdbcPartitionFragmenter(inputData); + query = fragmenter.buildFragmenterSql(dbProduct, query); + + if (LOG.isDebugEnabled()) { + LOG.debug("executeQuery: " + query); + } + + return statement.executeQuery(query); + } + + @Override + public OneRow readNextObject() throws Exception { + if (resultSet.next()) { + return new OneRow(null, resultSet); + } + return null; + } + + @Override + public void closeForRead() throws Exception { + if (statement != null && !statement.isClosed()) { + statement.close(); + statement = null; + } + super.closeConnection(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadResolver.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadResolver.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadResolver.java new file mode 100644 index 0000000..1c61537 --- /dev/null +++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadResolver.java @@ -0,0 +1,103 @@ +package org.apache.hawq.pxf.plugins.jdbc; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hawq.pxf.api.*; +import org.apache.hawq.pxf.api.io.DataType; +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.api.utilities.Plugin; + +import java.sql.ResultSet; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; + +/** + * Class JdbcReadResolver Read the Jdbc ResultSet, and generates the data type - List {@link OneField}. + */ +public class JdbcReadResolver extends Plugin implements ReadResolver { + private static final Log LOG = LogFactory.getLog(JdbcReadResolver.class); + //HAWQ Table column definitions + private ArrayList<ColumnDescriptor> columns = null; + + public JdbcReadResolver(InputData input) { + super(input); + columns = input.getTupleDescription(); + } + + @Override + public List<OneField> getFields(OneRow row) throws Exception { + ResultSet result = (ResultSet) row.getData(); + LinkedList<OneField> fields = new LinkedList<>(); + + for (int i = 0; i < columns.size(); i++) { + ColumnDescriptor column = columns.get(i); + String colName = column.columnName(); + Object value = null; + + OneField oneField = new OneField(); + oneField.type = column.columnTypeCode(); + + switch (DataType.get(oneField.type)) { + case INTEGER: + value = result.getInt(colName); + break; + case FLOAT8: + value = result.getDouble(colName); + break; + case REAL: + value = result.getFloat(colName); + break; + case BIGINT: + value = result.getLong(colName); + break; + case SMALLINT: + value = result.getShort(colName); + break; + case BOOLEAN: + value = result.getBoolean(colName); + break; + case BYTEA: + value = result.getBytes(colName); + break; + case VARCHAR: + case BPCHAR: + case TEXT: + case NUMERIC: + value = result.getString(colName); + break; + case TIMESTAMP: + case DATE: + value = result.getDate(colName); + break; + default: + throw new UnsupportedOperationException("Unknwon Field Type : " + DataType.get(oneField.type).toString() + + ", Column : " + column.toString()); + } + oneField.val = value; + fields.add(oneField); + } + return fields; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/WhereSQLBuilder.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/WhereSQLBuilder.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/WhereSQLBuilder.java new file mode 100644 index 0000000..541aa86 --- /dev/null +++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/WhereSQLBuilder.java @@ -0,0 +1,140 @@ +package org.apache.hawq.pxf.plugins.jdbc; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hawq.pxf.api.LogicalFilter; +import org.apache.hawq.pxf.plugins.jdbc.utils.DbProduct; +import org.apache.hawq.pxf.api.BasicFilter; +import org.apache.hawq.pxf.api.FilterParser; +import org.apache.hawq.pxf.api.io.DataType; +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; +import org.apache.hawq.pxf.api.utilities.InputData; + +/** + * Parse filter object generated by parent class {@link org.apache.hawq.pxf.plugins.jdbc.JdbcFilterBuilder}, + * and build WHERE statement. + * For Multiple filters , currently only support HDOP_AND . + * The unsupported Filter operation and LogicalOperation ,will return null statement. + * + */ +public class WhereSQLBuilder extends JdbcFilterBuilder { + private InputData inputData; + + public WhereSQLBuilder(InputData input) { + inputData = input; + } + + /** + * 1.check for LogicalOperator, Jdbc currently only support HDOP_AND. + * 2.and convert to BasicFilter List. + */ + private static List<BasicFilter> convertBasicFilterList(Object filter, List<BasicFilter> returnList) throws UnsupportedFilterException { + if (returnList == null) + returnList = new ArrayList<>(); + if (filter instanceof BasicFilter) { + returnList.add((BasicFilter) filter); + return returnList; + } + LogicalFilter lfilter = (LogicalFilter) filter; + if (lfilter.getOperator() != FilterParser.LogicalOperation.HDOP_AND) + throw new UnsupportedFilterException("unsupported LogicalOperation : " + lfilter.getOperator()); + for (Object f : lfilter.getFilterList()) { + returnList = convertBasicFilterList(f, returnList); + } + return returnList; + } + + public String buildWhereSQL(String db_product) throws Exception { + if (!inputData.hasFilter()) + return null; + List<BasicFilter> filters = null; + try { + String filterString = inputData.getFilterString(); + Object filterObj = getFilterObject(filterString); + + filters = convertBasicFilterList(filterObj, filters); + StringBuffer sb = new StringBuffer("1=1"); + for (Object obj : filters) { + BasicFilter filter = (BasicFilter) obj; + sb.append(" AND "); + + ColumnDescriptor column = inputData.getColumn(filter.getColumn().index()); + //the column name of filter + sb.append(column.columnName()); + + //the operation of filter + FilterParser.Operation op = filter.getOperation(); + switch (op) { + case HDOP_LT: + sb.append("<"); + break; + case HDOP_GT: + sb.append(">"); + break; + case HDOP_LE: + sb.append("<="); + break; + case HDOP_GE: + sb.append(">="); + break; + case HDOP_EQ: + sb.append("="); + break; + default: + throw new UnsupportedFilterException("unsupported Filter operation : " + op); + } + + DbProduct dbProduct = DbProduct.getDbProduct(db_product); + Object val = filter.getConstant().constant(); + switch (DataType.get(column.columnTypeCode())) { + case SMALLINT: + case INTEGER: + case BIGINT: + case FLOAT8: + case REAL: + case BOOLEAN: + sb.append(val.toString()); + break; + case TEXT: + sb.append("'").append(val.toString()).append("'"); + break; + case DATE: + //According to the database products, for the date field for special treatment. + sb.append(dbProduct.wrapDate(val)); + break; + default: + throw new UnsupportedFilterException("unsupported column type for filtering : " + column.columnTypeCode()); + } + + } + return sb.toString(); + } catch (UnsupportedFilterException ex) { + return null; + } + } + + static class UnsupportedFilterException extends Exception { + UnsupportedFilterException(String message) { + super(message); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/ByteUtil.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/ByteUtil.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/ByteUtil.java new file mode 100644 index 0000000..cdca8a6 --- /dev/null +++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/ByteUtil.java @@ -0,0 +1,84 @@ +package org.apache.hawq.pxf.plugins.jdbc.utils; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import org.apache.commons.lang.ArrayUtils; + +/** + * A tool class, used to deal with byte array merging, split and other methods. + */ +public class ByteUtil { + + public static byte[] mergeBytes(byte[] b1, byte[] b2) { + return ArrayUtils.addAll(b1,b2); + } + + public static byte[][] splitBytes(byte[] bytes, int n) { + int len = bytes.length / n; + byte[][] newBytes = new byte[len][]; + int j = 0; + for (int i = 0; i < len; i++) { + newBytes[i] = new byte[n]; + for (int k = 0; k < n; k++) newBytes[i][k] = bytes[j++]; + } + return newBytes; + } + + public static byte[] getBytes(long value) { + byte[] b = new byte[8]; + b[0] = (byte) ((value >> 56) & 0xFF); + b[1] = (byte) ((value >> 48) & 0xFF); + b[2] = (byte) ((value >> 40) & 0xFF); + b[3] = (byte) ((value >> 32) & 0xFF); + b[4] = (byte) ((value >> 24) & 0xFF); + b[5] = (byte) ((value >> 16) & 0xFF); + b[6] = (byte) ((value >> 8) & 0xFF); + b[7] = (byte) ((value >> 0) & 0xFF); + return b; + } + + public static byte[] getBytes(int value) { + byte[] b = new byte[4]; + b[0] = (byte) ((value >> 24) & 0xFF); + b[1] = (byte) ((value >> 16) & 0xFF); + b[2] = (byte) ((value >> 8) & 0xFF); + b[3] = (byte) ((value >> 0) & 0xFF); + return b; + } + + public static int toInt(byte[] b) { + return (((((int) b[3]) & 0xFF) << 32) + + ((((int) b[2]) & 0xFF) << 40) + + ((((int) b[1]) & 0xFF) << 48) + + ((((int) b[0]) & 0xFF) << 56)); + } + + public static long toLong(byte[] b) { + return ((((long) b[7]) & 0xFF) + + ((((long) b[6]) & 0xFF) << 8) + + ((((long) b[5]) & 0xFF) << 16) + + ((((long) b[4]) & 0xFF) << 24) + + ((((long) b[3]) & 0xFF) << 32) + + ((((long) b[2]) & 0xFF) << 40) + + ((((long) b[1]) & 0xFF) << 48) + + ((((long) b[0]) & 0xFF) << 56)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/DbProduct.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/DbProduct.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/DbProduct.java new file mode 100644 index 0000000..30ff1fe --- /dev/null +++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/DbProduct.java @@ -0,0 +1,49 @@ +package org.apache.hawq.pxf.plugins.jdbc.utils; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * As the syntax of different database products are not the same, such as the date type field for processing, ORACLE use to_date () function, and mysql use Date () function. + So we create this class to abstract public methods, the specific database products can implementation of these methods. + */ +public abstract class DbProduct { + //wrap date string + public abstract String wrapDate(Object date_val); + + + public static DbProduct getDbProduct(String dbName) { + if (dbName.toUpperCase().contains("MYSQL")) + return new MysqlProduct(); + else if (dbName.toUpperCase().contains("ORACLE")) + return new OracleProduct(); + else if (dbName.toUpperCase().contains("POSTGRES")) + return new PostgresProduct(); + else + //Unsupported databases may execute errors + return new CommonProduct(); + } +} + +class CommonProduct extends DbProduct { + @Override + public String wrapDate(Object dateVal) { + return "date'" + dateVal + "'"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MysqlProduct.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MysqlProduct.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MysqlProduct.java new file mode 100644 index 0000000..2e60ada --- /dev/null +++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MysqlProduct.java @@ -0,0 +1,31 @@ +package org.apache.hawq.pxf.plugins.jdbc.utils; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Implements methods for MySQL Database. + */ +public class MysqlProduct extends DbProduct { + + @Override + public String wrapDate(Object dateVal){ + return "DATE('" + dateVal + "')"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/OracleProduct.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/OracleProduct.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/OracleProduct.java new file mode 100644 index 0000000..b46c5f3 --- /dev/null +++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/OracleProduct.java @@ -0,0 +1,30 @@ +package org.apache.hawq.pxf.plugins.jdbc.utils; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Implements methods for Oracle Database. + */ +public class OracleProduct extends DbProduct { + @Override + public String wrapDate(Object dateVal) { + return "to_date('" + dateVal + "','yyyy-mm-dd')"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/PostgresProduct.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/PostgresProduct.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/PostgresProduct.java new file mode 100644 index 0000000..901cf2e --- /dev/null +++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/PostgresProduct.java @@ -0,0 +1,30 @@ +package org.apache.hawq.pxf.plugins.jdbc.utils; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Implements methods for Postgres Database. + */ +public class PostgresProduct extends DbProduct { + @Override + public String wrapDate(Object dateVal) { + return "date'" + dateVal + "'"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/JdbcFilterBuilderTest.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/JdbcFilterBuilderTest.java b/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/JdbcFilterBuilderTest.java new file mode 100644 index 0000000..1b1191c --- /dev/null +++ b/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/JdbcFilterBuilderTest.java @@ -0,0 +1,101 @@ +package org.apache.hawq.pxf.plugins.jdbc; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import org.apache.hawq.pxf.api.BasicFilter; +import org.apache.hawq.pxf.api.FilterParser; +import org.apache.hawq.pxf.api.FilterParser.LogicalOperation; +import org.apache.hawq.pxf.api.LogicalFilter; +import org.junit.Test; + +import static org.apache.hawq.pxf.api.FilterParser.Operation.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class JdbcFilterBuilderTest { + @Test + public void parseFilterWithThreeOperations() throws Exception { + //orgin sql => col_1>'2008-02-01' and col_1<'2008-12-01' or col_2 > 1200 + String filterstr = "a1c25s10d2008-02-01o2a1c25s10d2008-12-01o1l0a2c20s4d1200o2l1"; + JdbcFilterBuilder builder = new JdbcFilterBuilder(); + + LogicalFilter filterList = (LogicalFilter) builder.getFilterObject(filterstr); + assertEquals(LogicalOperation.HDOP_OR, filterList.getOperator()); + LogicalFilter l1_left = (LogicalFilter) filterList.getFilterList().get(0); + BasicFilter l1_right = (BasicFilter) filterList.getFilterList().get(1); + + //column_2 > 1200 + assertEquals(2, l1_right.getColumn().index()); + assertEquals(HDOP_GT, l1_right.getOperation()); + assertEquals(1200L, l1_right.getConstant().constant()); + + assertEquals(LogicalOperation.HDOP_AND, l1_left.getOperator()); + BasicFilter l2_left = (BasicFilter) l1_left.getFilterList().get(0); + BasicFilter l2_right = (BasicFilter) l1_left.getFilterList().get(1); + + //column_1 > '2008-02-01' + assertEquals(1, l2_left.getColumn().index()); + assertEquals(HDOP_GT, l2_left.getOperation()); + assertEquals("2008-02-01", l2_left.getConstant().constant()); + + //column_2 < '2008-12-01' + assertEquals(1, l2_right.getColumn().index()); + assertEquals(HDOP_LT, l2_right.getOperation()); + assertEquals("2008-12-01", l2_right.getConstant().constant()); + + } + + @Test + public void parseFilterWithLogicalOperation() throws Exception { + WhereSQLBuilder builder = new WhereSQLBuilder(null); + LogicalFilter filter = (LogicalFilter) builder.getFilterObject("a1c25s5dfirsto5a2c20s1d2o2l0"); + assertEquals(LogicalOperation.HDOP_AND, filter.getOperator()); + assertEquals(2, filter.getFilterList().size()); + } + + @Test + public void parseNestedExpressionWithLogicalOperation() throws Exception { + WhereSQLBuilder builder = new WhereSQLBuilder(null); + LogicalFilter filter = (LogicalFilter) builder.getFilterObject("a1c25s5dfirsto5a2c20s1d2o2l0a1c20s1d1o1l1"); + assertEquals(LogicalOperation.HDOP_OR, filter.getOperator()); + assertEquals(LogicalOperation.HDOP_AND, ((LogicalFilter) filter.getFilterList().get(0)).getOperator()); + assertEquals(HDOP_LT, ((BasicFilter) filter.getFilterList().get(1)).getOperation()); + } + + @Test + public void parseISNULLExpression() throws Exception { + WhereSQLBuilder builder = new WhereSQLBuilder(null); + BasicFilter filter = (BasicFilter) builder.getFilterObject("a1o8"); + assertEquals(FilterParser.Operation.HDOP_IS_NULL, filter.getOperation()); + assertEquals(1, filter.getColumn().index()); + assertNull(filter.getConstant()); + } + + @Test + public void parseISNOTNULLExpression() throws Exception { + WhereSQLBuilder builder = new WhereSQLBuilder(null); + BasicFilter filter = (BasicFilter) builder.getFilterObject("a1o9"); + assertEquals(FilterParser.Operation.HDOP_IS_NOT_NULL, filter.getOperation()); + assertEquals(1, filter.getColumn().index()); + assertNull(filter.getConstant()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenterTest.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenterTest.java b/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenterTest.java new file mode 100644 index 0000000..b7a7493 --- /dev/null +++ b/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenterTest.java @@ -0,0 +1,235 @@ +package org.apache.hawq.pxf.plugins.jdbc; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hawq.pxf.api.Fragment; +import org.apache.hawq.pxf.api.UserDataException; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.plugins.jdbc.utils.ByteUtil; +import org.junit.Test; + +import java.text.ParseException; +import java.util.Calendar; +import java.util.List; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class JdbcPartitionFragmenterTest { + InputData inputData; + + @Test + public void testPartionByDateOfMonth() throws Exception { + prepareConstruction(); + when(inputData.getDataSource()).thenReturn("sales"); + when(inputData.getUserProperty("PARTITION_BY")).thenReturn("cdate:date"); + when(inputData.getUserProperty("RANGE")).thenReturn("2008-01-01:2009-01-01"); + when(inputData.getUserProperty("INTERVAL")).thenReturn("1:month"); + + JdbcPartitionFragmenter fragment = new JdbcPartitionFragmenter(inputData); + List<Fragment> fragments = fragment.getFragments(); + assertEquals(fragments.size(), 12); + + //fragment - 1 + byte[] fragMeta = fragments.get(0).getMetadata(); + byte[][] newBytes = ByteUtil.splitBytes(fragMeta, 8); + long fragStart = ByteUtil.toLong(newBytes[0]); + long fragEnd = ByteUtil.toLong(newBytes[1]); + assertDateEquals(fragStart, 2008, 1, 1); + assertDateEquals(fragEnd, 2008, 2, 1); + + //fragment - 12 + fragMeta = fragments.get(11).getMetadata(); + newBytes = ByteUtil.splitBytes(fragMeta, 8); + fragStart = ByteUtil.toLong(newBytes[0]); + fragEnd = ByteUtil.toLong(newBytes[1]); + assertDateEquals(fragStart, 2008, 12, 1); + assertDateEquals(fragEnd, 2009, 1, 1); + + //when end_date > start_date + when(inputData.getUserProperty("RANGE")).thenReturn("2008-01-01:2001-01-01"); + fragment = new JdbcPartitionFragmenter(inputData); + assertEquals(0, fragment.getFragments().size()); + } + + @Test + public void testPartionByDateOfYear() throws Exception { + prepareConstruction(); + when(inputData.getDataSource()).thenReturn("sales"); + when(inputData.getUserProperty("PARTITION_BY")).thenReturn("cdate:date"); + when(inputData.getUserProperty("RANGE")).thenReturn("2008-01-01:2011-01-01"); + when(inputData.getUserProperty("INTERVAL")).thenReturn("1:year"); + + JdbcPartitionFragmenter fragment = new JdbcPartitionFragmenter(inputData); + List<Fragment> fragments = fragment.getFragments(); + assertEquals(fragments.size(), 3); + } + + @Test + public void testPartionByInt() throws Exception { + prepareConstruction(); + when(inputData.getDataSource()).thenReturn("sales"); + when(inputData.getUserProperty("PARTITION_BY")).thenReturn("year:int"); + when(inputData.getUserProperty("RANGE")).thenReturn("2001:2012"); + when(inputData.getUserProperty("INTERVAL")).thenReturn("2"); + + JdbcPartitionFragmenter fragment = new JdbcPartitionFragmenter(inputData); + List<Fragment> fragments = fragment.getFragments(); + assertEquals(fragments.size(), 6); + + //fragment - 1 + byte[] fragMeta = fragments.get(0).getMetadata(); + byte[][] newBytes = ByteUtil.splitBytes(fragMeta, 4); + int fragStart = ByteUtil.toInt(newBytes[0]); + int fragEnd = ByteUtil.toInt(newBytes[1]); + assertEquals(fragStart, 2001); + assertEquals(fragEnd, 2003); + + //fragment - 6 + fragMeta = fragments.get(5).getMetadata(); + newBytes = ByteUtil.splitBytes(fragMeta, 4); + fragStart = ByteUtil.toInt(newBytes[0]); + fragEnd = ByteUtil.toInt(newBytes[1]); + assertEquals(fragStart, 2011); + assertEquals(fragEnd, 2012); + + //when end > start + when(inputData.getUserProperty("RANGE")).thenReturn("2013:2012"); + fragment = new JdbcPartitionFragmenter(inputData); + assertEquals(0, fragment.getFragments().size()); + + } + + @Test + public void testPartionByEnum() throws Exception { + prepareConstruction(); + when(inputData.getDataSource()).thenReturn("sales"); + when(inputData.getUserProperty("PARTITION_BY")).thenReturn("level:enum"); + when(inputData.getUserProperty("RANGE")).thenReturn("excellent:good:general:bad"); + + JdbcPartitionFragmenter fragment = new JdbcPartitionFragmenter(inputData); + List<Fragment> fragments = fragment.getFragments(); + assertEquals(fragments.size(), 4); + + //fragment - 1 + byte[] fragMeta = fragments.get(0).getMetadata(); + assertEquals("excellent", new String(fragMeta)); + + //fragment - 4 + fragMeta = fragments.get(3).getMetadata(); + assertEquals("bad", new String(fragMeta)); + } + + @Test + public void inValidPartitiontype() throws Exception { + prepareConstruction(); + when(inputData.getDataSource()).thenReturn("sales"); + when(inputData.getUserProperty("PARTITION_BY")).thenReturn("level:float"); + when(inputData.getUserProperty("RANGE")).thenReturn("100:200"); + + try { + new JdbcPartitionFragmenter(inputData); + fail("Expected an IllegalArgumentException"); + } catch (UserDataException ex) { + + } + } + + @Test + public void inValidParameterFormat() throws Exception { + prepareConstruction(); + when(inputData.getDataSource()).thenReturn("sales"); + + //PARTITION_BY must be comma-delimited string + when(inputData.getUserProperty("PARTITION_BY")).thenReturn("level-enum"); + when(inputData.getUserProperty("RANGE")).thenReturn("100:200"); + try { + new JdbcPartitionFragmenter(inputData); + fail("Expected an ArrayIndexOutOfBoundsException"); + } catch (ArrayIndexOutOfBoundsException ex) { + } + + //date string must be yyyy-MM-dd + when(inputData.getUserProperty("PARTITION_BY")).thenReturn("cdate:date"); + when(inputData.getUserProperty("RANGE")).thenReturn("2008/01/01:2009-01-01"); + when(inputData.getUserProperty("INTERVAL")).thenReturn("1:month"); + try { + JdbcPartitionFragmenter fragment = new JdbcPartitionFragmenter(inputData); + fragment.getFragments(); + fail("Expected an ParseException"); + } catch (UserDataException ex) { + } + } + + @Test + public void inValidParameterValue() throws Exception { + prepareConstruction(); + //INTERVAL must be greater than 0 + when(inputData.getUserProperty("PARTITION_BY")).thenReturn("cdate:date"); + when(inputData.getUserProperty("RANGE")).thenReturn("2008-01-01:2009-01-01"); + when(inputData.getUserProperty("INTERVAL")).thenReturn("-1:month"); + try { + new JdbcPartitionFragmenter(inputData); + fail("Expected an UserDataException"); + } catch (UserDataException ex) { + } + } + + @Test + public void inValidIntervaltype() throws Exception { + prepareConstruction(); + when(inputData.getDataSource()).thenReturn("sales"); + when(inputData.getUserProperty("PARTITION_BY")).thenReturn("cdate:date"); + when(inputData.getUserProperty("RANGE")).thenReturn("2008-01-01:2011-01-01"); + when(inputData.getUserProperty("INTERVAL")).thenReturn("6:hour"); + + try { + JdbcPartitionFragmenter fragment = new JdbcPartitionFragmenter(inputData); + fragment.getFragments(); + fail("Expected an UserDataException"); + } catch (UserDataException ex) { + } + } + + @Test + public void testNoPartition() throws Exception { + prepareConstruction(); + when(inputData.getDataSource()).thenReturn("sales"); + + JdbcPartitionFragmenter fragment = new JdbcPartitionFragmenter(inputData); + List<Fragment> fragments = fragment.getFragments(); + assertEquals(fragments.size(), 1); + } + + private void assertDateEquals(long date, int year, int month, int day) { + Calendar calendar = Calendar.getInstance(); + calendar.setTimeInMillis(date); + assertEquals(calendar.get(Calendar.YEAR), year); + assertEquals(calendar.get(Calendar.MONTH), month - 1); + assertEquals(calendar.get(Calendar.DAY_OF_MONTH), day); + } + + private void prepareConstruction() throws Exception { + inputData = mock(InputData.class); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/SqlBuilderTest.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/SqlBuilderTest.java b/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/SqlBuilderTest.java new file mode 100644 index 0000000..ebe367d --- /dev/null +++ b/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/SqlBuilderTest.java @@ -0,0 +1,175 @@ +package org.apache.hawq.pxf.plugins.jdbc; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hawq.pxf.api.*; +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.api.io.DataType; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.*; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Validate SQL string generated by the {@link JdbcPartitionFragmenter#buildFragmenterSql} method + * and the {@link WhereSQLBuilder#buildWhereSQL} method. + */ +public class SqlBuilderTest { + private static final Log LOG = LogFactory.getLog(SqlBuilderTest.class); + static final String DB_PRODUCT = "mysql"; + static final String ORIGINAL_SQL = "select * from sales"; + InputData inputData; + + @Before + public void setup() throws Exception { + LOG.info("SqlBuilderTest.setup()"); + } + + @After + public void cleanup() throws Exception { + LOG.info("SqlBuilderTest.cleanup()"); + } + + @Test + public void testIdFilter() throws Exception { + prepareConstruction(); + when(inputData.hasFilter()).thenReturn(true); + when(inputData.getFilterString()).thenReturn("a0c20s1d1o5");//id=1 + + WhereSQLBuilder builder = new WhereSQLBuilder(inputData); + assertEquals("1=1 AND id=1", builder.buildWhereSQL(DB_PRODUCT)); + } + + @Test + public void testDateAndAmtFilter() throws Exception { + prepareConstruction(); + when(inputData.hasFilter()).thenReturn(true); + // cdate>'2008-02-01' and cdate<'2008-12-01' and amt > 1200 + when(inputData.getFilterString()).thenReturn("a1c25s10d2008-02-01o2a1c25s10d2008-12-01o1l0a2c20s4d1200o2l0"); + + WhereSQLBuilder builder = new WhereSQLBuilder(inputData); + assertEquals("1=1 AND cdate>DATE('2008-02-01') AND cdate<DATE('2008-12-01') AND amt>1200" + , builder.buildWhereSQL(DB_PRODUCT)); + } + + @Test + public void testUnsupportedOperationFilter() throws Exception { + prepareConstruction(); + when(inputData.hasFilter()).thenReturn(true); + // grade like 'bad' + when(inputData.getFilterString()).thenReturn("a3c25s3dbado7"); + + WhereSQLBuilder builder = new WhereSQLBuilder(inputData); + assertEquals(null, builder.buildWhereSQL(DB_PRODUCT)); + } + + @Test + public void testUnsupportedLogicalFilter() throws Exception { + prepareConstruction(); + when(inputData.hasFilter()).thenReturn(true); + // cdate>'2008-02-01' or amt < 1200 + when(inputData.getFilterString()).thenReturn("a1c25s10d2008-02-01o2a2c20s4d1200o2l1"); + + WhereSQLBuilder builder = new WhereSQLBuilder(inputData); + assertEquals(null, builder.buildWhereSQL(DB_PRODUCT)); + } + + @Test + public void testDatePartition() throws Exception { + prepareConstruction(); + when(inputData.hasFilter()).thenReturn(false); + when(inputData.getUserProperty("PARTITION_BY")).thenReturn("cdate:date"); + when(inputData.getUserProperty("RANGE")).thenReturn("2008-01-01:2009-01-01"); + when(inputData.getUserProperty("INTERVAL")).thenReturn("2:month"); + JdbcPartitionFragmenter fragment = new JdbcPartitionFragmenter(inputData); + List<Fragment> fragments = fragment.getFragments(); + assertEquals(6, fragments.size()); + + //partition-1 : cdate>=2008-01-01 and cdate<2008-03-01 + when(inputData.getFragmentMetadata()).thenReturn(fragments.get(0).getMetadata()); + String fragmentSql = fragment.buildFragmenterSql(DB_PRODUCT, ORIGINAL_SQL); + assertEquals(ORIGINAL_SQL + " WHERE 1=1 AND " + + "cdate >= DATE('2008-01-01') AND cdate < DATE('2008-03-01')", fragmentSql); + } + + @Test + public void testFilterAndPartition() throws Exception { + prepareConstruction(); + when(inputData.hasFilter()).thenReturn(true); + when(inputData.getFilterString()).thenReturn("a0c20s1d5o2"); //id>5 + when(inputData.getUserProperty("PARTITION_BY")).thenReturn("grade:enum"); + when(inputData.getUserProperty("RANGE")).thenReturn("excellent:good:general:bad"); + + WhereSQLBuilder builder = new WhereSQLBuilder(inputData); + String whereSql = builder.buildWhereSQL(DB_PRODUCT); + assertEquals("1=1 AND id>5", whereSql); + + JdbcPartitionFragmenter fragment = new JdbcPartitionFragmenter(inputData); + List<Fragment> fragments = fragment.getFragments(); + + //partition-1 : id>5 and grade='excellent' + when(inputData.getFragmentMetadata()).thenReturn(fragments.get(0).getMetadata()); + + String filterSql = ORIGINAL_SQL + " WHERE " + whereSql; + String fragmentSql = fragment.buildFragmenterSql(DB_PRODUCT, filterSql); + assertEquals(filterSql + " AND grade='excellent'", fragmentSql); + } + + @Test + public void testNoPartition() throws Exception { + prepareConstruction(); + when(inputData.hasFilter()).thenReturn(false); + JdbcPartitionFragmenter fragment = new JdbcPartitionFragmenter(inputData); + List<Fragment> fragments = fragment.getFragments(); + assertEquals(1, fragments.size()); + + when(inputData.getFragmentMetadata()).thenReturn(fragments.get(0).getMetadata()); + + String fragmentSql = fragment.buildFragmenterSql(DB_PRODUCT, ORIGINAL_SQL); + assertEquals(ORIGINAL_SQL, fragmentSql); + } + + + private void prepareConstruction() throws Exception { + inputData = mock(InputData.class); + when(inputData.getDataSource()).thenReturn("sales"); + + + ArrayList<ColumnDescriptor> columns = new ArrayList<>(); + columns.add(new ColumnDescriptor("id", DataType.INTEGER.getOID(), 0, "int4", null)); + columns.add(new ColumnDescriptor("cdate", DataType.DATE.getOID(), 1, "date", null)); + columns.add(new ColumnDescriptor("amt", DataType.FLOAT8.getOID(), 2, "float8", null)); + columns.add(new ColumnDescriptor("grade", DataType.TEXT.getOID(), 3, "text", null)); + when(inputData.getTupleDescription()).thenReturn(columns); + when(inputData.getColumn(0)).thenReturn(columns.get(0)); + when(inputData.getColumn(1)).thenReturn(columns.get(1)); + when(inputData.getColumn(2)).thenReturn(columns.get(2)); + when(inputData.getColumn(3)).thenReturn(columns.get(3)); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml b/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml index d36f54b..53f15bb 100644 --- a/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml +++ b/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml @@ -168,4 +168,13 @@ under the License. <resolver>org.apache.hawq.pxf.plugins.json.JsonResolver</resolver> </plugins> </profile> + <profile> + <name>Jdbc</name> + <description>A profile for reading data into HAWQ via JDBC</description> + <plugins> + <fragmenter>org.apache.hawq.pxf.plugins.jdbc.JdbcPartitionFragmenter</fragmenter> + <accessor>org.apache.hawq.pxf.plugins.jdbc.JdbcReadAccessor</accessor> + <resolver>org.apache.hawq.pxf.plugins.jdbc.JdbcReadResolver</resolver> + </plugins> + </profile> </profiles> http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/settings.gradle ---------------------------------------------------------------------- diff --git a/pxf/settings.gradle b/pxf/settings.gradle index c610116..55b6aea 100644 --- a/pxf/settings.gradle +++ b/pxf/settings.gradle @@ -25,4 +25,5 @@ include 'pxf-service' include 'pxf-hdfs' include 'pxf-hive' include 'pxf-hbase' -include 'pxf-json' \ No newline at end of file +include 'pxf-json' +include 'pxf-jdbc' \ No newline at end of file
