HAWQ-1605. Support INSERT in PXF JDBC plugin

(closes #1353)

Fix incorrect TIMESTAMP handling

PXF JDBC plugin update

* Add support for INSERT queries:
        * The INSERT queries are processed by the same classes as the SELECT 
queries;
        * INSERTs are processed by the JDBC PreparedStatement;
        * INSERTs support batching (by means of JDBC);

* Minor changes in WhereSQLBuilder and JdbcPartitionFragmenter:
        * Removed 'WHERE 1=1';
        * The same pattern of spaces around operators everywhere ('a = b', not 
'a=b');
        * JdbcPartitionFragmenter.buildFragmenterSql() made static to avoid 
extra checks of InputData (proposed by @sansanichfb);

* Refactoring and some microoptimizations;

PXF JDBC refactoring

* The README.md is completely rewritten;

* Lots of changes in comments and javadoc comments;

* Code refactoring and minor changes in codestyle

Fixes proposed by @sansanichfb

Add DbProduct for Microsoft SQL Server

Notes on consistency in README and errors

* Add an explicit note on consistency of INSERT queries (it is not guaranteed).

* Change error message on INSERT failure

* Minor corrections of README

The fixes were proposed by @sansanichfb

Improve WhereSQLBuilder

* Add support of TIMESTAMP values;

* Add support of operations <>, LIKE, IS NULL, IS NOT NULL.

Fix proposed by @sansanichfb

Throw an exception when trying to open an already open connection when writing 
to an external database using `openForWrite()`.

Although the behaviour is different in case of `openForRead()`, it does not 
apply here. The second call to `openForWrite()` could be made from another 
thread, and that would result in a race: the `PreparedStatement` we use to 
write to an external database is the same object for all threads, and the 
procedure `writeNextObject()` is not `synchronized` (or "protected" some other 
way).

Simplify logging; BatchUpdateException

Simplify logging so that the logs produced by pxf-jdbc do not grow too big in 
case DEBUG is enabled (the removed logging calls provide the field types and 
names, and in most cases they are the same as in the data provided. The 
exceptions are still being logged).

Add processing of BatchUpdateException, so that the real cause of an exception 
is returned to the user.

PXF JDBC thread pool support

Implement support of multi-threaded processing of INSERT queries, using a 
thread pool. To use the feature, set the parameter POOL_SIZE in the LOCATION 
clause of an external table (<1: Pool size is equal to a number of CPUs 
available to JVM; =1: Disable thread pool; >1: Use the given size of a pool.

Not all operations are processed by pool threads: pool threads only execute() 
the queries, but they do not fill the PreparedStatement from OneRow.

Redesign connection pooling

* Redesign connection pooling: move OneRow objects processing to threads from 
the pool. This decreases the load of a single-thread part of PXF;

* Introduce WriterCallable & related. This significantly simplifies the code of 
JdbcAccessor and allows to introduce new methods of processing INSERT queries 
with ease and enables fast hardcode tweaks for the same purpose.

* Add docs on thread pool feature

Support long values in PARTITION clause

Support values of Java primitive type 'long' in PARTITION clause (both for 
RANGE and INTERVAL variables).

* Modify JdbcPartitionFragmenter (convert all int variables to long)
* Move parsing of INTERVAL values for PARTITION_TYPE "INT" to class constructor 
(and add a parse exception handler)
* Simplify ByteUtil (remove methods to deal with values of type 'int')
* Update JdbcPartitionFragmenterTest
* Minor changes in comments

Fix pxf-profiles-default.xml

Remove ampersand from a description of JDBC profile from 
pxf-profiles-default.xml

Remove explicit throws of IllegalArgumentException

Remove explicit references to 'IllegalArgumentException', as the caller is 
probably unable to recover from them.
'IllegalStateException' is left unchanged, as it is thrown when the caller must 
perform an action that will resolve the problem ('WriterCallable' is full).

Other runtime exceptions are explicitly listed in function definitions as 
before; their causes are usually known to the caller, so it could do something 
about them or at least send a more meaningful message about the error cause to 
the user.

Proposed by Alex Denissov <[email protected]>

Simplify isCallRequired()

Make function 'isCallRequired()' body a one-line expression in all 
implementations of 'WriterCallable'.

Proposed by Alex Denissov <[email protected]>

Remove rollback and change BATCH_SIZE logic

Remove calls to 'tryRollback()' and all processing of rollbacks in INSERT 
queries.
The reason for the change is that rollback is effective for only one case: 
INSERT is performed from one PXF segment that uses one thread to perform that 
INSERT, and the external database supports transactions. In most cases, there 
are more than one PXF segment that performs INSERT, and rollback is of no use 
then.
On the other hand, rollback logic is cumbersome and notably increases code 
complexity.

Due to the removal of rollback, there is no longer a need to keep BATCH_SIZE 
infinite as often as possible (when BATCH_SIZE is infinite, the number of 
scenarious of rollback() failing is lower (but this number is not zero)).
Thus, setting a recommended 
(https://docs.oracle.com/cd/E11882_01/java.112/e16548/oraperf.htm#JJDBC28754) 
value makes sense.
The old logic of infinite batch size also remains active.

Modify README.md: minor corrections, new BATCH_SIZE logic

Proposed by Alex Denissov <[email protected]>

Change BATCH_SIZE logic

* Modify BATCH_SIZE parameter processing according to new proposals 
https://github.com/apache/hawq/pull/1353#discussion_r214413534
* Update README.md
* Restore fallback to non-batched INSERTs in case the external database (or 
JDBC connector) does not support batch updates

Proposed by Alex Denissov <[email protected]>
Proposed by Dmitriy Pavlov <[email protected]>

Modify processing of BATCH_SIZE parameter

Modify BATCH_SIZE parameter processing according to the proposal 
https://github.com/apache/hawq/pull/1353#discussion_r215023775:
* Update allowed values of BATCH_SIZE and their meanings
* Introduce explicit flag of presentness of a BATCH_SIZE parameter
* Introduce DEFAULT_BATCH_SIZE constant in JdbcPlugin
* Move processing of BATCH_SIZE values to JdbcAccessor
* Update README.md

Proposed by @divyabhargov, @denalex

Fix column type for columns converted to TEXT

Modify column type processing so that the column type is set correctly for 
fields that:
* Are represented as columns of type TEXT by GPDBWritable, but whose actual 
type is different
* Contain NULL value

Before, the column type code was not set correctly for such columns due to a 
check of NULL field value.

Proposed and authored by @divyabhargov

removed parseUnsignedInt


Project: http://git-wip-us.apache.org/repos/asf/hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/hawq/commit/472fa2b7
Tree: http://git-wip-us.apache.org/repos/asf/hawq/tree/472fa2b7
Diff: http://git-wip-us.apache.org/repos/asf/hawq/diff/472fa2b7

Branch: refs/heads/master
Commit: 472fa2b74679a5d2b6fb08dd107b292a95e577b7
Parents: a741655
Author: Ivan Leskin <[email protected]>
Authored: Mon Mar 5 17:19:26 2018 +0300
Committer: Alexander Denissov <[email protected]>
Committed: Fri Sep 7 12:23:24 2018 -0700

----------------------------------------------------------------------
 pxf/pxf-jdbc/README.md                          | 341 ++++++++++------
 .../hawq/pxf/plugins/jdbc/JdbcAccessor.java     | 353 +++++++++++++++++
 .../pxf/plugins/jdbc/JdbcFilterBuilder.java     |  75 ++--
 .../plugins/jdbc/JdbcPartitionFragmenter.java   | 391 ++++++++++---------
 .../hawq/pxf/plugins/jdbc/JdbcPlugin.java       | 226 ++++++++---
 .../hawq/pxf/plugins/jdbc/JdbcReadAccessor.java | 122 ------
 .../hawq/pxf/plugins/jdbc/JdbcReadResolver.java | 103 -----
 .../hawq/pxf/plugins/jdbc/JdbcResolver.java     | 367 +++++++++++++++++
 .../hawq/pxf/plugins/jdbc/WhereSQLBuilder.java  | 162 +++++---
 .../hawq/pxf/plugins/jdbc/utils/ByteUtil.java   |  38 +-
 .../hawq/pxf/plugins/jdbc/utils/DbProduct.java  |  45 ++-
 .../plugins/jdbc/utils/MicrosoftProduct.java    |  35 ++
 .../pxf/plugins/jdbc/utils/MysqlProduct.java    |  10 +-
 .../pxf/plugins/jdbc/utils/OracleProduct.java   |  11 +-
 .../pxf/plugins/jdbc/utils/PostgresProduct.java |  11 +-
 .../writercallable/BatchWriterCallable.java     | 109 ++++++
 .../writercallable/SimpleWriterCallable.java    | 102 +++++
 .../jdbc/writercallable/WriterCallable.java     |  56 +++
 .../writercallable/WriterCallableFactory.java   |  97 +++++
 .../jdbc/JdbcPartitionFragmenterTest.java       |  16 +-
 .../hawq/pxf/plugins/jdbc/SqlBuilderTest.java   |  54 +--
 .../src/main/resources/pxf-profiles-default.xml |   6 +-
 22 files changed, 1984 insertions(+), 746 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/README.md
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/README.md b/pxf/pxf-jdbc/README.md
index c158f32..fde1641 100644
--- a/pxf/pxf-jdbc/README.md
+++ b/pxf/pxf-jdbc/README.md
@@ -1,139 +1,254 @@
-# Accessing Jdbc Table Data
+# PXF JDBC plugin
 
-The PXF JDBC plug-in reads data stored in Traditional relational database,ie : 
mysql,ORACLE,postgresql.
+The PXF JDBC plugin allows to access external databases that implement [the 
Java Database Connectivity 
API](http://www.oracle.com/technetwork/java/javase/jdbc/index.html). Both read 
(SELECT) and write (INSERT) operations are supported by the plugin.
 
-PXF-JDBC plug-in is the client of the database, the host running the database 
engine does not need to
-deploy PXF.
+PXF JDBC plugin is a JDBC client. The host running the external database does 
not need to deploy PXF.
 
 
-# Prerequisites
+## Prerequisites
 
-Check the following before using PXF to access JDBC Table:
-* The PXF JDBC plug-in is installed on all cluster nodes.
-* The JDBC JAR files are installed on all cluster nodes, and added to file - 
'pxf-public.classpath'
-* You have tested PXF on HDFS.
+Check the following before using the PXF JDBC plugin:
 
-# Using PXF Tables to Query JDBC Table
-Jdbc tables are defined in same schema in PXF.The PXF table has the same 
column name
-as Jdbc Table, and the column type requires a mapping of Jdbc-HAWQ.
+* The PXF JDBC plugin is installed on all PXF nodes;
+* The JDBC driver for external database is installed on all PXF nodes;
+* All PXF nodes are allowed to connect to the external database.
 
-## Syntax Example
-The following PXF table definition is valid for Jdbc Table.
 
-    CREATE [READABLE|WRITABLE] EXTERNAL TABLE table_name
-        ( column_name data_type [, ...] | LIKE other_table )
-    LOCATION 
('pxf://namenode[:port]/jdbc-schema-name.jdbc-table-name?<pxf-parameters><&custom-parameters>')
-    FORMAT 'CUSTOM' (formatter='pxfwritable_import')
-If `jdbc-schema-name` is omitted, pxf will default to the `default` schema.
+## Limitations
 
-The `column_name` must exists in jdbc-table,`data_type` equals or similar to
-the jdbc-column type.
+Both **PXF table** **and** a **table in external database** **must have the 
same definiton**. Their columns must have the same names, and the columns' 
types must correspond.
 
-where `<pxf-parameters>` is:
+**Not all data types are supported** by the plugin. The following PXF data 
types are supported:
 
-    [FRAGMENTER=org.apache.hawq.pxf.plugins.jdbc.JdbcPartitionFragmenter
-    &ACCESSOR=org.apache.hawq.pxf.plugins.jdbc.JdbcReadAccessor
-    &RESOLVER=org.apache.hawq.pxf.plugins.jdbc.JdbcReadResolver]
-    | PROFILE=Jdbc
+* `INTEGER`, `BIGINT`, `SMALLINT`
+* `REAL`, `FLOAT8`
+* `NUMERIC`
+* `BOOLEAN`
+* `VARCHAR`, `BPCHAR`, `TEXT`
+* `DATE`
+* `TIMESTAMP`
+* `BYTEA`
 
-where `<custom-parameters>` is:
+The `<full_external_table_name>` (see below) **must not match** the 
[pattern](https://docs.oracle.com/javase/7/docs/api/java/util/regex/Pattern.html)
 `/.*/[0-9]*-[0-9]*_[0-9]*` (the name must not start with `/` and have an 
ending that consists of `/` and three groups of numbers of arbitrary length, 
the first two separated by `-` and the last two separated by `_`. For example, 
the following table name is not allowed: `/public.table/1-2_3`).
 
-    JDBC_DRIVER=<jdbc-driver-class-name>
-     &DB_URL=<jdbc-url>&USER=<database-user>&PASS=<password>
+At the moment, one PXF external table cannot serve both SELECT and INSERT 
queries. A separate PXF external table is required for each type of queries.
 
-## Jdbc Table to HAWQ Data Type Mapping
-Jdbc-table and hawq-table data type system is similar to, does not require
-a special type of mapping.
-# Usage
-The following to mysql, for example, describes the use of PDF-JDBC.
 
-To query MySQL Table in HAWQ, perform the following steps:
-1. create Table in MySQL
+## Syntax
+```
+CREATE [ READABLE | WRITABLE ] EXTERNAL TABLE <table_name> (
+    { <column_name> <data_type> [, ...] | LIKE <other_table> }
+)
+LOCATION (
+    
'pxf://<full_external_table_name>?<pxf_parameters><jdbc_required_parameters><jdbc_login_parameters><plugin_parameters>'
+)
+FORMAT 'CUSTOM' (FORMATTER={'pxfwritable_import' | 'pxfwritable_export'})
+```
 
-         mysql> use demodb;
-         mysql> create table myclass(
-                 id int(4) not null primary key,
-                 name varchar(20) not null,
-                 gender int(4) not null default '0',
-                 degree double(16,2));`
-2. insert test data
+The **`<pxf_parameters>`** are:
+```
+{
+PROFILE=JDBC
+|
+FRAGMENTER=org.apache.hawq.pxf.plugins.jdbc.JdbcPartitionFragmenter
+&ACCESSOR=org.apache.hawq.pxf.plugins.jdbc.JdbcAccessor
+&RESOLVER=org.apache.hawq.pxf.plugins.jdbc.JdbcResolver
+}
+```
 
-        insert into myclass values(1,"tom",1,90);
-        insert into myclass values(2,'john',0,94);
-        insert into myclass values(3,'simon',1,79);
-3. copy mysql-jdbc jar files to `/usr/lib/pxf` (on all cluster nodes), and
-edit `/etc/pxf/conf/pxf-public.classpath` , add :
+The **`<jdbc_required_parameters>`** are:
+```
+&JDBC_DRIVER=<external_database_jdbc_driver>
+&DB_URL=<external_database_url>
+```
 
-        /usr/lib/pxf/mysql-connector-java-*.jar
+The **`<jdbc_login_parameters>`** are **optional**, but if provided, both of 
them must be present:
+```
+&USER=<external_database_login>
+&PASS=<external_database_password>
+```
 
-     Restart all pxf-engine.
+The **`<plugin_parameters>`** are **optional**:
 
-4. create Table in HAWQ:
+```
+[
+&BATCH_SIZE=<batch_size>
+]
+[
+&POOL_SIZE=<pool_size>
+]
+[
+&PARTITION_BY=<column>:<column_type>
+&RANGE=<start_value>:<end_value>
+[&INTERVAL=<value>[:<unit>]]
+]
+```
 
-        gpadmin=# CREATE EXTERNAL TABLE myclass(id integer,
-             name text,
-             gender integer,
-             degree float8)
-             LOCATION ('pxf://localhost:51200/demodb.myclass'
-                     '?PROFILE=JDBC'
-                     '&JDBC_DRIVER=com.mysql.jdbc.Driver'
-                     
'&DB_URL=jdbc:mysql://192.168.200.6:3306/demodb&USER=root&PASS=root'
-                     )
-            FORMAT 'CUSTOM' (Formatter='pxfwritable_import');
+The meaning of `BATCH_SIZE` is given in section [batching of INSERT 
queries](#Batching).
 
-MySQL instance IP: 192.168.200.6, port: 3306.
+The meaning of `POOL_SIZE` is given in section [using thread pool for INSERT 
queries](#Thread_pool)
 
-5. query mysql data in HAWQ:
+The meaning of other parameters is given in section 
[partitioning](#Partitioning).
 
-        gpadmin=# select * from myclass;
-        gpadmin=# select * from myclass where id=2;
 
-# Jdbc Table Fragments
-## intro
-PXF-JDBC plug-in as a  client to access jdbc database.By default, there is
-only one pxf-instance connectied JDBC Table.If the jdbc table data is large,
-you can also use multiple pxf-instance to access the JDBC table by fragments.
+## SELECT queries
 
-## Syntax
-where `<custom-parameters>` can use following partition parameters:
-
-    
PARTITION_BY=column_name:column_type&RANGE=start_value[:end_value]&INTERVAL=interval_num[:interval_unit]
-The `PARTITION_BY` parameter indicates which  column to use as the partition 
column.
-It can be split by colon(':'),the `column_type` current supported : 
`date|int|enum` .
-The Date format is `yyyy-MM-dd`.
-The `PARTITION_BY` parameter can be null, and there will be only one fragment.
-
-The `RANGE` parameter indicates the range of data to be queried , it can be 
split by colon(':').
- The range is left-closed, ie: `>= start_value AND < end_value` .
-
-The `INTERVAL` parameter can be split by colon(':'), indicate the interval
- value of one fragment. When `column_type` is `date`,this parameter must
- be split by colon, and `interval_unit` can be `year|month|day`. When
- `column_type` is int, the `interval_unit` can be empty. When `column_type`
- is enum,the `INTERVAL` parameter can be empty.
-
-The syntax examples is :
-
-    * 
PARTITION_BY=createdate:date&RANGE=2008-01-01:2010-01-01&INTERVAL=1:month'
-    * PARTITION_BY=year:int&RANGE=2008:2010&INTERVAL=1
-    * PARTITION_BY=grade:enum&RANGE=excellent:good:general:bad
-
-## Usage
-MySQL Table:
-
-    CREATE TABLE sales (id int primary key, cdate date, amt 
decimal(10,2),grade varchar(30))
-HAWQ Table:
-
-    CREATE EXTERNAL TABLE sales(id integer,
-                 cdate date,
-                 amt float8,
-                 grade text)
-                 LOCATION ('pxf://localhost:51200/sales'
-                         '?PROFILE=JDBC'
-                         '&JDBC_DRIVER=com.mysql.jdbc.Driver'
-                         
'&DB_URL=jdbc:mysql://192.168.200.6:3306/demodb&USER=root&PASS=root'
-                         
'&PARTITION_BY=cdate:date&RANGE=2008-01-01:2010-01-01&INTERVAL=1:year'
-                         )
-                 FORMAT 'CUSTOM' (Formatter='pxfwritable_import');
-At PXF-JDBC plugin,this will generate 2 fragments.Then HAWQ assign these 
fragments to 2 PXF-instance
-to access jdbc table data.
\ No newline at end of file
+The PXF JDBC plugin allows to perform SELECT queries to external tables.
+
+To perform SELECT queries, create an `EXTERNAL READABLE TABLE` or just an 
`EXTERNAL TABLE` with `FORMAT 'CUSTOM' (FORMATTER='pxfwritable_import')` in PXF.
+
+The `BATCH_SIZE` parameter is not used in such tables. *However*, if this 
parameter is present, its value will be checked for correctness (it must be an 
integer).
+
+
+## INSERT queries
+
+The PXF JDBC plugin allows to perform INSERT queries to external tables. Note 
that **the plugin does not guarantee consistency for INSERT queries**. Use a 
staging table in external database to deal with this.
+
+To perform INSERT queries, create an `EXTERNAL WRITABLE TABLE` with `FORMAT 
'CUSTOM' (FORMATTER='pxfwritable_export')` in PXF.
+
+The `PARTITION_BY`, `RANGE` and `INTERVAL` parameters in such tables are 
ignored.
+
+
+### Batching
+
+INSERT queries can be batched. This may significantly increase perfomance if 
batching is supported by an external database.
+
+Batching is enabled by default, and the default batch size is `100` (this is a 
[recommended](https://docs.oracle.com/cd/E11882_01/java.112/e16548/oraperf.htm#JJDBC28754)
 value). To control this feature, create an external table with the parameter 
`BATCH_SIZE` set to:
+* `0` or `1`. Batching will be disabled;
+* `integer > 1`. Batch will be of given size.
+
+Batching must be supported by the JDBC driver of an external database. If the 
driver does not support batching, behaviour depends on the `BATCH_SIZE` 
parameter:
+* `BATCH_SIZE` is not present; `BATCH_SIZE` is `0` or `1`. PXF will try to 
execute INSERT query without batching;
+* `BATCH_SIZE` is an `integer > 1`. INSERT query will fail with an appropriate 
error message.
+
+
+## Thread pool
+
+The INSERT queries can be processed by multiple threads. This may 
significantly increase perfomance if the external database can work with 
multiple connections simultaneously.
+
+It is recommended to use batching together with a thread pool. In this case, 
each thread receives data from one (whole) batch and processes it. If a thread 
pool is used without batching, each thread in a pool will receive exactly one 
tuple; as a rule, this takes much more time than an usual one-thread INSERT.
+
+If any of the threads from pool fails, the user will get the error message. 
However, if INSERT fails, some data still may be INSERTed into the external 
database.
+
+To enable thread pool, create an external table with the paramete `POOL_SIZE` 
set to:
+* `integer < 1`. The number of threads in a pool will be equal to the number 
of CPUs in the system;
+* `integer > 1`. Thread pool will consist of the given number of threads;
+* `1`. Thread pool will be disabled.
+
+By default (`POOL_SIZE` is absent), thread pool is not used.
+
+
+## Partitioning
+
+PXF JDBC plugin supports simultaneous read access to an external table from 
multiple PXF segments. This feature is called partitioning.
+
+
+### Syntax
+
+Use the following `<plugin_parameters>` (mentioned above) to activate 
partitioning:
+
+```
+&PARTITION_BY=<column>:<column_type>
+&RANGE=<start_value>:<end_value>
+[&INTERVAL=<value>[:<unit>]]
+```
+
+* The `PARTITION_BY` parameter indicates which column to use as a partition 
column. Only one column can be used as the partition column
+    * The `<column>` is the name of a partition column;
+    * The `<column_type>` is the data type of a partition column. At the 
moment, the **supported types** are `INT`, `DATE` and `ENUM`.
+
+* The `RANGE` parameter indicates the range of data to be queried.
+    * If the partition type is `ENUM`, the `RANGE` parameter must be a list of 
values, each of which forms its own fragment;
+    * If the partition type is `INT` or `DATE`, the `RANGE` parameter must be 
a finite left-closed range ( `... >= start_value AND ... < end_value`);
+    * For `DATE` partitions, the date format must be `yyyy-MM-dd`.
+
+* The `INTERVAL` parameter is **required** for `INT` and `DATE` partitions. It 
is ignored if `<column_type>` is `ENUM`.
+    * The `<value>` is the size of each fragment (the last one may be made 
smaller by the plugin);
+    * The `<unit>` **must** be provided if `<column_type>` is `DATE`. `year`, 
`month` and `day` are supported. This parameter is ignored in case of any other 
`<column_type>`.
+
+Example partitions:
+* `&PARTITION_BY=id:int&RANGE=42:142&INTERVAL=2`
+* `&PARTITION_BY=createdate:date&RANGE=2008-01-01:2010-01-01&INTERVAL=1:month`
+* `&PARTITION_BY=grade:enum&RANGE=excellent:good:general:bad`
+
+
+### Mechanism
+
+If partitioning is activated, the SELECT query is split into a set of small 
queries, each of which is called a *fragment*. All the fragments are processed 
by separate PXF instances simultaneously. If there are more fragments than PXF 
instances, some instances will process more than one fragment; if only one PXF 
instance is available, it will process all the fragments.
+
+Extra query constraints (`WHERE` expressions) are automatically added to each 
fragment to guarantee that every tuple of data is retrieved from the external 
database exactly once.
+
+
+### Partitioning example
+Consider the following MySQL table:
+```
+CREATE TABLE sales (
+    id int primary key,
+    cdate date,
+    amt decimal(10,2),
+    grade varchar(30)
+)
+```
+and the following HAWQ table:
+```
+CREATE EXTERNAL TABLE sales(
+    id integer,
+    cdate date,
+    amt float8,
+    grade text
+)
+LOCATION 
('pxf://sales?PROFILE=JDBC&JDBC_DRIVER=com.mysql.jdbc.Driver&DB_URL=jdbc:mysql://192.168.200.6:3306/demodb&PARTITION_BY=cdate:date&RANGE=2008-01-01:2010-01-01&INTERVAL=1:year')
+FORMAT 'CUSTOM' (FORMATTER='pxfwritable_import');
+```
+
+The PXF JDBC plugin will generate two fragments  for a query `SELECT * FROM 
sales`. Then HAWQ will assign each of them to a separate PXF segment. Each 
segment will perform the SELECT query, and the first one will get tuples with 
`cdate` values for year `2008`, while the second will get tuples for year 
`2009`. Then each PXF segment will send its results back to HAWQ, where they 
will be "concatenated" and returned.
+
+
+## Examples
+
+The following example shows how to access a MySQL table via JDBC.
+
+Suppose MySQL instance is available at `192.168.200.6:3306`. A table in MySQL 
is created:
+```
+use demodb;
+create table myclass(
+    id int(4) not null primary key,
+    name varchar(20) not null,
+    degree double(16,2)
+);
+```
+
+Then some data is inserted into MySQL table:
+```
+insert into myclass values(1, 'tom', 90);
+insert into myclass values(2, 'john', 94);
+insert into myclass values(3, 'simon', 79);
+```
+
+The MySQL JDBC driver files (JAR) are copied to `/usr/lib/pxf` on all cluster 
nodes and a line is added to `/etc/pxf/conf/pxf-public.classpath`:
+```
+/usr/lib/pxf/mysql-connector-java-*.jar
+```
+
+After this, all PXF segments are restarted.
+
+Then a table in HAWQ is created:
+```
+CREATE EXTERNAL TABLE myclass(
+    id integer,
+    name text,
+    degree float8
+)
+LOCATION (
+    
'pxf://localhost:51200/demodb.myclass?PROFILE=JDBC&JDBC_DRIVER=com.mysql.jdbc.Driver&DB_URL=jdbc:mysql://192.168.200.6:3306/demodb&USER=root&PASS=root'
+)
+FORMAT 'CUSTOM' (
+    FORMATTER='pxfwritable_import'
+);
+```
+
+Finally, a query to a HAWQ external table is made:
+```
+SELECT * FROM myclass;
+SELECT id, name FROM myclass WHERE id = 2;
+```

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcAccessor.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcAccessor.java 
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcAccessor.java
new file mode 100644
index 0000000..2cacafd
--- /dev/null
+++ 
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcAccessor.java
@@ -0,0 +1,353 @@
+package org.apache.hawq.pxf.plugins.jdbc;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.WriteAccessor;
+import org.apache.hawq.pxf.api.UserDataException;
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.jdbc.writercallable.WriterCallable;
+import org.apache.hawq.pxf.plugins.jdbc.writercallable.WriterCallableFactory;
+
+import java.util.List;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.sql.Statement;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLTimeoutException;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * JDBC tables accessor
+ *
+ * The SELECT queries are processed by {@link java.sql.Statement}
+ *
+ * The INSERT queries are processed by {@link java.sql.PreparedStatement} and
+ * built-in JDBC batches of arbitrary size
+ */
+public class JdbcAccessor extends JdbcPlugin implements ReadAccessor, 
WriteAccessor {
+    /**
+     * Class constructor
+     */
+    public JdbcAccessor(InputData inputData) throws UserDataException {
+        super(inputData);
+    }
+
+    /**
+     * openForRead() implementation
+     * Create query, open JDBC connection, execute query and store the result 
into resultSet
+     *
+     * @throws SQLException if a database access error occurs
+     * @throws SQLTimeoutException if a problem with the connection occurs
+     * @throws ParseException if th SQL statement provided in PXF InputData is 
incorrect
+     * @throws ClassNotFoundException if the JDBC driver was not found
+     */
+    @Override
+    public boolean openForRead() throws SQLException, SQLTimeoutException, 
ParseException, ClassNotFoundException {
+        if (statementRead != null && !statementRead.isClosed()) {
+            return true;
+        }
+
+        Connection connection = super.getConnection();
+
+        queryRead = buildSelectQuery(connection.getMetaData());
+        statementRead = connection.createStatement();
+        resultSetRead = statementRead.executeQuery(queryRead);
+
+        return true;
+    }
+
+    /**
+     * readNextObject() implementation
+     * Retreive the next tuple from resultSet and return it
+     *
+     * @throws SQLException if a problem in resultSet occurs
+     */
+    @Override
+    public OneRow readNextObject() throws SQLException {
+        if (resultSetRead.next()) {
+            return new OneRow(resultSetRead);
+        }
+        return null;
+    }
+
+    /**
+     * closeForRead() implementation
+     */
+    @Override
+    public void closeForRead() {
+        JdbcPlugin.closeStatement(statementRead);
+    }
+
+    /**
+     * openForWrite() implementation
+     * Create query template and open JDBC connection
+     *
+     * @throws SQLException if a database access error occurs
+     * @throws SQLTimeoutException if a problem with the connection occurs
+     * @throws ParseException if the SQL statement provided in PXF InputData 
is incorrect
+     * @throws ClassNotFoundException if the JDBC driver was not found
+     */
+    @Override
+    public boolean openForWrite() throws SQLException, SQLTimeoutException, 
ParseException, ClassNotFoundException {
+        if (statementWrite != null && !statementWrite.isClosed()) {
+            throw new SQLException("The connection to an external database is 
already open.");
+        }
+
+        Connection connection = super.getConnection();
+
+        queryWrite = buildInsertQuery();
+        statementWrite = super.getPreparedStatement(connection, queryWrite);
+
+        // Process batchSize
+        if (!connection.getMetaData().supportsBatchUpdates()) {
+            if ((batchSizeIsSetByUser) && (batchSize > 1)) {
+                throw new SQLException("The external database does not support 
batch updates");
+            }
+            else {
+                batchSize = 1;
+            }
+        }
+
+        // Process poolSize
+        if (poolSize < 1) {
+            poolSize = Runtime.getRuntime().availableProcessors();
+            LOG.info(
+                "The POOL_SIZE is set to the number of CPUs available (" + 
Integer.toString(poolSize) + ")"
+            );
+        }
+        if (poolSize > 1) {
+            executorServiceWrite = Executors.newFixedThreadPool(poolSize);
+            poolTasks = new LinkedList<>();
+        }
+
+        // Setup WriterCallableFactory
+        writerCallableFactory = new WriterCallableFactory();
+        writerCallableFactory.setPlugin(this);
+        writerCallableFactory.setQuery(queryWrite);
+        writerCallableFactory.setBatchSize(batchSize);
+        if (poolSize == 1) {
+            writerCallableFactory.setStatement(statementWrite);
+        }
+
+        writerCallable = writerCallableFactory.get();
+
+        return true;
+    }
+
+       /**
+     * writeNextObject() implementation
+     *
+     * If batchSize is not 0 or 1, add a tuple to the batch of statementWrite
+     * Otherwise, execute an INSERT query immediately
+     *
+     * In both cases, a {@link java.sql.PreparedStatement} is used
+     *
+     * @throws SQLException if a database access error occurs
+     * @throws IOException if the data provided by {@link JdbcResolver} is 
corrupted
+     * @throws ClassNotFoundException if pooling is used and the JDBC driver 
was not found
+     * @throws IllegalStateException if writerCallableFactory was not properly 
initialized
+     * @throws Exception if it happens in writerCallable.call()
+     */
+    @Override
+    public boolean writeNextObject(OneRow row) throws Exception {
+        if (writerCallable == null) {
+            throw new IllegalStateException("The JDBC connection was not 
properly initialized (writerCallable is null)");
+        }
+
+        writerCallable.supply(row);
+        if (writerCallable.isCallRequired()) {
+            if (poolSize > 1) {
+                // Pooling is used. Create new writerCallable
+                poolTasks.add(executorServiceWrite.submit(writerCallable));
+                writerCallable = writerCallableFactory.get();
+            }
+            else {
+                // Pooling is not used
+                writerCallable.call();
+            }
+        }
+
+        return true;
+    }
+
+    /**
+     * closeForWrite() implementation
+     *
+     * @throws SQLException if a database access error occurs
+     * @throws Exception if it happens in writerCallable.call() or due to 
runtime errors in thread pool
+     */
+    @Override
+    public void closeForWrite() throws Exception {
+        if ((statementWrite == null) || (writerCallable == null)) {
+            return;
+        }
+
+        try {
+            if (poolSize > 1) {
+                // Process thread pool
+                Exception firstException = null;
+                for (Future<SQLException> task : poolTasks) {
+                    // We need this construction to ensure that we try to 
close all connections opened by pool threads
+                    try {
+                        SQLException currentSqlException = task.get();
+                        if (currentSqlException != null) {
+                            if (firstException == null) {
+                                firstException = currentSqlException;
+                            }
+                            LOG.error(
+                                "A SQLException in a pool thread occured: " + 
currentSqlException.getClass() + " " + currentSqlException.getMessage()
+                            );
+                        }
+                    }
+                    catch (Exception e) {
+                        // This exception must have been caused by some thread 
execution error. However, there may be other exception (maybe of class 
SQLException) that happened in one of threads that were not examined yet. That 
is why we do not modify firstException
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(
+                                "A runtime exception in a thread pool occured: 
" + e.getClass() + " " + e.getMessage()
+                            );
+                        }
+                    }
+                }
+                try {
+                    executorServiceWrite.shutdown();
+                    executorServiceWrite.shutdownNow();
+                }
+                catch (Exception e) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("executorServiceWrite.shutdown() or 
.shutdownNow() threw an exception: " + e.getClass() + " " + e.getMessage());
+                    }
+                }
+                if (firstException != null) {
+                    throw firstException;
+                }
+            }
+
+            // Send data that is left
+            writerCallable.call();
+        }
+        finally {
+            JdbcPlugin.closeStatement(statementWrite);
+        }
+    }
+
+
+    /**
+     * Build SELECT query (with "WHERE" and partition constraints)
+     *
+     * @return Complete SQL query
+     *
+     * @throws ParseException if the constraints passed in InputData are 
incorrect
+     * @throws SQLException if the database metadata is invalid
+     */
+    private String buildSelectQuery(DatabaseMetaData databaseMetaData) throws 
ParseException, SQLException {
+        if (databaseMetaData == null) {
+            throw new IllegalArgumentException("The provided databaseMetaData 
is null");
+        }
+        StringBuilder sb = new StringBuilder();
+        sb.append("SELECT ");
+
+        // Insert columns' names
+        String columnDivisor = "";
+        for (ColumnDescriptor column : columns) {
+            sb.append(columnDivisor);
+            columnDivisor = ", ";
+            sb.append(column.columnName());
+        }
+
+        // Insert the table name
+        sb.append(" FROM ").append(tableName);
+
+        // Insert regular WHERE constraints
+        (new 
WhereSQLBuilder(inputData)).buildWhereSQL(databaseMetaData.getDatabaseProductName(),
 sb);
+
+        // Insert partition constraints
+        JdbcPartitionFragmenter.buildFragmenterSql(inputData, 
databaseMetaData.getDatabaseProductName(), sb);
+
+        return sb.toString();
+    }
+
+    /**
+     * Build INSERT query template (field values are replaced by placeholders 
'?')
+     *
+     * @return SQL query with placeholders instead of actual values
+     */
+    private String buildInsertQuery() {
+        StringBuilder sb = new StringBuilder();
+
+        sb.append("INSERT INTO ");
+
+        // Insert the table name
+        sb.append(tableName);
+
+        // Insert columns' names
+        sb.append("(");
+        String fieldDivisor = "";
+        for (ColumnDescriptor column : columns) {
+            sb.append(fieldDivisor);
+            fieldDivisor = ", ";
+            sb.append(column.columnName());
+        }
+        sb.append(")");
+
+        sb.append(" VALUES ");
+
+        // Insert values placeholders
+        sb.append("(");
+        fieldDivisor = "";
+        for (int i = 0; i < columns.size(); i++) {
+            sb.append(fieldDivisor);
+            fieldDivisor = ", ";
+            sb.append("?");
+        }
+        sb.append(")");
+
+        return sb.toString();
+    }
+
+    // Read variables
+    private String queryRead = null;
+    private Statement statementRead = null;
+    private ResultSet resultSetRead = null;
+
+    // Write variables
+    private String queryWrite = null;
+    private PreparedStatement statementWrite = null;
+    private WriterCallableFactory writerCallableFactory = null;
+    private WriterCallable writerCallable = null;
+    private ExecutorService executorServiceWrite = null;
+    private List<Future<SQLException> > poolTasks = null;
+
+    // Static variables
+    private static final Log LOG = LogFactory.getLog(JdbcAccessor.class);
+}

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcFilterBuilder.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcFilterBuilder.java
 
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcFilterBuilder.java
index 3c56ccb..ad331ed 100644
--- 
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcFilterBuilder.java
+++ 
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcFilterBuilder.java
@@ -26,14 +26,12 @@ import org.apache.hawq.pxf.api.LogicalFilter;
 import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
+import java.text.ParseException;
 
 /**
- * Uses the filter parser code to build a filter object, either simple - a
- * single {@link BasicFilter} object or a
- * compound - a {@link List} of
- * {@link BasicFilter} objects.
- * The subclass {@link WhereSQLBuilder} will use the filter for
- * generate WHERE statement.
+ * A filter builder. Uses a single {@link BasicFilter} or a {@link List} of 
{@link BasicFilter} objects.
+ *
+ * The subclass {@link WhereSQLBuilder} will use the result to generate WHERE 
statement.
  */
 public class JdbcFilterBuilder implements FilterParser.FilterBuilder {
     /**
@@ -41,26 +39,28 @@ public class JdbcFilterBuilder implements 
FilterParser.FilterBuilder {
      * list of such filters.
      *
      * @param filterString the string representation of the filter
-     * @return a single {@link BasicFilter}
-     *         object or a {@link List} of
-     *         {@link BasicFilter} objects.
-     * @throws Exception if parsing the filter failed or filter is not a basic
-     *             filter or list of basic filters
+     * @return a {@link BasicFilter} or a {@link List} of {@link BasicFilter}.
+     * @throws ParseException if parsing the filter failed or filter is not a 
basic filter or list of basic filters
      */
-    public Object getFilterObject(String filterString) throws Exception {
-        FilterParser parser = new FilterParser(this);
-        Object result = 
parser.parse(filterString.getBytes(FilterParser.DEFAULT_CHARSET));
-
-        if (!(result instanceof LogicalFilter) && !(result instanceof 
BasicFilter)
-                && !(result instanceof List)) {
-            throw new Exception("String " + filterString
-                    + " resolved to no filter");
+    public Object getFilterObject(String filterString) throws ParseException {
+        try {
+            FilterParser parser = new FilterParser(this);
+            Object result = 
parser.parse(filterString.getBytes(FilterParser.DEFAULT_CHARSET));
+
+            if (
+                !(result instanceof LogicalFilter) &&
+                !(result instanceof BasicFilter) &&
+                !(result instanceof List)
+            ) {
+                throw new Exception("'" + filterString + "' could not be 
resolved to a filter");
+            }
+            return result;
+        }
+        catch (Exception e) {
+            throw new ParseException(e.getMessage(), 0);
         }
-
-        return result;
     }
 
-
     @Override
     public Object build(FilterParser.LogicalOperation op, Object leftOperand, 
Object rightOperand) {
         return handleLogicalOperation(op, leftOperand, rightOperand);
@@ -72,28 +72,33 @@ public class JdbcFilterBuilder implements 
FilterParser.FilterBuilder {
     }
 
     @Override
-    @SuppressWarnings("unchecked")
     public Object build(FilterParser.Operation opId, Object leftOperand,
                         Object rightOperand) throws Exception {
         // Assume column is on the left
-        return handleSimpleOperations(opId,
-                (FilterParser.ColumnIndex) leftOperand,
-                (FilterParser.Constant) rightOperand);
+        return handleSimpleOperations(
+            opId,
+            (FilterParser.ColumnIndex) leftOperand,
+            (FilterParser.Constant) rightOperand
+        );
     }
 
     @Override
-    public Object build(FilterParser.Operation operation, Object operand) 
throws Exception {
-        if (operation == FilterParser.Operation.HDOP_IS_NULL || operation == 
FilterParser.Operation.HDOP_IS_NOT_NULL) {
-            // use null for the constant value of null comparison
+    public Object build(FilterParser.Operation operation, Object operand) 
throws UnsupportedOperationException {
+        if (
+            operation == FilterParser.Operation.HDOP_IS_NULL ||
+            operation == FilterParser.Operation.HDOP_IS_NOT_NULL
+        ) {
+            // Use null for the constant value of null comparison
             return handleSimpleOperations(operation, 
(FilterParser.ColumnIndex) operand, null);
-        } else {
-            throw new Exception("Unsupported unary operation " + operation);
+        }
+        else {
+            throw new UnsupportedOperationException("Unsupported unary 
operation '" + operation + "'");
         }
     }
 
     /*
-     * Handles simple column-operator-constant expressions Creates a special
-     * filter in the case the column is the row key column
+     * Handles simple column-operator-constant expressions.
+     * Creates a special filter in the case the column is the row key column
      */
     private BasicFilter handleSimpleOperations(FilterParser.Operation opId,
                                                FilterParser.ColumnIndex column,
@@ -102,8 +107,7 @@ public class JdbcFilterBuilder implements 
FilterParser.FilterBuilder {
     }
 
     /**
-     * Handles AND of already calculated expressions. Currently only AND, in 
the
-     * future OR can be added
+     * Handles AND of already calculated expressions.
      *
      * Four cases here:
      * <ol>
@@ -135,7 +139,6 @@ public class JdbcFilterBuilder implements 
FilterParser.FilterBuilder {
     }
 
     private Object handleLogicalOperation(FilterParser.LogicalOperation 
operator, Object leftOperand, Object rightOperand) {
-
         List<Object> result = new LinkedList<>();
 
         result.add(leftOperand);

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenter.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenter.java
 
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenter.java
index 914b7d9..4971269 100644
--- 
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenter.java
+++ 
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenter.java
@@ -27,296 +27,311 @@ import org.apache.hawq.pxf.api.utilities.InputData;
 import org.apache.hawq.pxf.plugins.jdbc.utils.ByteUtil;
 import org.apache.hawq.pxf.plugins.jdbc.utils.DbProduct;
 
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.List;
 
 /**
- * Fragmenter class for JDBC data resources.
- *
- * Extends the {@link Fragmenter} abstract class, with the purpose of 
transforming
- * an input data path  (an JDBC Database table name  and user request 
parameters)  into a list of regions
- * that belong to this table.
- * <br>
- * The parameter Patterns<br>
- * There are three  parameters,  the format is as follows:<br>
- * <pre>
- * 
<code>PARTITION_BY=column_name:column_type&amp;RANGE=start_value[:end_value]&amp;INTERVAL=interval_num[:interval_unit]</code>
- * </pre>
- * The <code>PARTITION_BY</code> parameter can be split by colon(':'),the 
<code>column_type</code> current supported : <code>date,int,enum</code> .
- * The Date format is 'yyyy-MM-dd'. <br>
- * The <code>RANGE</code> parameter can be split by colon(':') ,used to 
identify the starting range of each fragment.
- * The range is left-closed, ie:<code> '&gt;= start_value AND &lt; end_value' 
</code>.If the <code>column_type</code> is <code>int</code>,
- * the <code>end_value</code> can be empty. If the <code>column_type</code>is 
<code>enum</code>,the parameter <code>RANGE</code> can be empty. <br>
- * The <code>INTERVAL</code> parameter can be split by colon(':'), indicate 
the interval value of one fragment.
- * When <code>column_type</code> is <code>date</code>,this parameter must be 
split by colon, and <code>interval_unit</code> can be 
<code>year,month,day</code>.
- * When <code>column_type</code> is <code>int</code>, the 
<code>interval_unit</code> can be empty.
- * When <code>column_type</code> is <code>enum</code>,the 
<code>INTERVAL</code> parameter can be empty.
- * <br>
- * <p>
- * The syntax examples is :<br>
- * 
<code>PARTITION_BY=createdate:date&amp;RANGE=2008-01-01:2010-01-01&amp;INTERVAL=1:month'</code>
 <br>
- * <code>PARTITION_BY=year:int&amp;RANGE=2008:2010&amp;INTERVAL=1</code> <br>
- * <code>PARTITION_BY=grade:enum&amp;RANGE=excellent:good:general:bad</code>
- * </p>
+ * JDBC fragmenter
  *
+ * Splits the query to allow multiple simultaneous SELECTs
  */
 public class JdbcPartitionFragmenter extends Fragmenter {
-    String[] partitionBy = null;
-    String[] range = null;
-    String[] interval = null;
-    PartitionType partitionType = null;
-    String partitionColumn = null;
-    IntervalType intervalType = null;
-    int intervalNum = 1;
+    /**
+     * Insert fragment constraints into the SQL query.
+     *
+     * @param inputData InputData of the fragment
+     * @param dbName Database name (affects the behaviour for DATE partitions)
+     * @param query SQL query to insert constraints to. The query may may 
contain other WHERE statements
+     */
+    public static void buildFragmenterSql(InputData inputData, String dbName, 
StringBuilder query) {
+        if (inputData.getUserProperty("PARTITION_BY") == null) {
+            return;
+        }
 
-    //when partitionType is DATE,it is valid
-    Calendar rangeStart = null;
-    Calendar rangeEnd = null;
+        byte[] meta = inputData.getFragmentMetadata();
+        if (meta == null) {
+            return;
+        }
+        String[] partitionBy = 
inputData.getUserProperty("PARTITION_BY").split(":");
+        String partitionColumn = partitionBy[0];
+        PartitionType partitionType = PartitionType.typeOf(partitionBy[1]);
+        DbProduct dbProduct = DbProduct.getDbProduct(dbName);
 
+        if (!query.toString().contains("WHERE")) {
+            query.append(" WHERE ");
+        }
+        else {
+            query.append(" AND ");
+        }
 
-    enum PartitionType {
-        DATE,
-        INT,
-        ENUM;
+        switch (partitionType) {
+            case DATE: {
+                byte[][] newb = ByteUtil.splitBytes(meta);
+                Date fragStart = new Date(ByteUtil.toLong(newb[0]));
+                Date fragEnd = new Date(ByteUtil.toLong(newb[1]));
 
-        public static PartitionType getType(String str) {
-            return valueOf(str.toUpperCase());
-        }
-    }
+                SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
+                query.append(partitionColumn).append(" >= 
").append(dbProduct.wrapDate(df.format(fragStart)));
+                query.append(" AND ");
+                query.append(partitionColumn).append(" < 
").append(dbProduct.wrapDate(df.format(fragEnd)));
 
-    enum IntervalType {
-        DAY,
-        MONTH,
-        YEAR;
+                break;
+            }
+            case INT: {
+                byte[][] newb = ByteUtil.splitBytes(meta);
+                long fragStart = ByteUtil.toLong(newb[0]);
+                long fragEnd = ByteUtil.toLong(newb[1]);
 
-        public static IntervalType type(String str) {
-            return valueOf(str.toUpperCase());
+                query.append(partitionColumn).append(" >= ").append(fragStart);
+                query.append(" AND ");
+                query.append(partitionColumn).append(" < ").append(fragEnd);
+                break;
+            }
+            case ENUM: {
+                query.append(partitionColumn).append(" = '").append(new 
String(meta)).append("'");
+                break;
+            }
         }
     }
 
     /**
-     * Constructor for JdbcPartitionFragmenter.
+     * Class constructor.
      *
-     * @param inConf input data such as which Jdbc table to scan
-     * @throws UserDataException  if the request parameter is malformed
+     * @param inputData PXF InputData
+     * @throws UserDataException if the request parameter is malformed
      */
-    public JdbcPartitionFragmenter(InputData inConf) throws UserDataException {
-        super(inConf);
-        if (inConf.getUserProperty("PARTITION_BY") == null)
+    public JdbcPartitionFragmenter(InputData inputData) throws 
UserDataException {
+        super(inputData);
+        if (inputData.getUserProperty("PARTITION_BY") == null) {
             return;
+        }
+
+        // PARTITION_BY
         try {
-            partitionBy = inConf.getUserProperty("PARTITION_BY").split(":");
-            partitionColumn = partitionBy[0];
-            partitionType = PartitionType.getType(partitionBy[1]);
-        } catch (IllegalArgumentException | ArrayIndexOutOfBoundsException e1) 
{
-            throw new UserDataException("The parameter 'PARTITION_BY' invalid, 
the pattern is 'column_name:date|int|enum'");
+            partitionType = PartitionType.typeOf(
+                inputData.getUserProperty("PARTITION_BY").split(":")[1]
+            );
+        }
+        catch (IllegalArgumentException | ArrayIndexOutOfBoundsException ex) {
+            throw new UserDataException("The parameter 'PARTITION_BY' is 
invalid. The pattern is '<column_name>:date|int|enum'");
         }
 
-        //parse and validate parameter-RANGE
+        // RANGE
         try {
-            String rangeStr = inConf.getUserProperty("RANGE");
+            String rangeStr = inputData.getUserProperty("RANGE");
             if (rangeStr != null) {
                 range = rangeStr.split(":");
-                if (range.length == 1 && partitionType != PartitionType.ENUM)
-                    throw new UserDataException("The parameter 'RANGE' does 
not specify '[:end_value]'");
-            } else
+                if (range.length == 1 && partitionType != PartitionType.ENUM) {
+                    throw new UserDataException("The parameter 'RANGE' must 
specify ':<end_value>' for this PARTITION_TYPE");
+                }
+            }
+            else {
                 throw new UserDataException("The parameter 'RANGE' must be 
specified along with 'PARTITION_BY'");
-        } catch (IllegalArgumentException e1) {
-            throw new UserDataException("The parameter 'RANGE' invalid, the 
pattern is 'start_value[:end_value]'");
+            }
+
+            if (partitionType == PartitionType.DATE) {
+                // Parse DATE partition type values
+                try {
+                    SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
+                    rangeDateStart = Calendar.getInstance();
+                    rangeDateStart.setTime(df.parse(range[0]));
+                    rangeDateEnd = Calendar.getInstance();
+                    rangeDateEnd.setTime(df.parse(range[1]));
+                }
+                catch (ParseException e) {
+                    throw new UserDataException("The parameter 'RANGE' has 
invalid date format. The correct format is 'yyyy-MM-dd'");
+                }
+            }
+            else if (partitionType == PartitionType.INT) {
+                // Parse INT partition type values
+                try {
+                    rangeIntStart = Long.parseLong(range[0]);
+                    rangeIntEnd = Long.parseLong(range[1]);
+                }
+                catch (NumberFormatException e) {
+                    throw new UserDataException("The parameter 'RANGE' is 
invalid. Both range boundaries must be integers");
+                }
+            }
+        }
+        catch (IllegalArgumentException ex) {
+            throw new UserDataException("The parameter 'RANGE' is invalid. The 
pattern is '<start_value>[:<end_value>]'");
         }
 
-        //parse and validate parameter-INTERVAL
+        // INTERVAL
         try {
-            String intervalStr = inConf.getUserProperty("INTERVAL");
+            String intervalStr = inputData.getUserProperty("INTERVAL");
             if (intervalStr != null) {
-                interval = intervalStr.split(":");
-                intervalNum = Integer.parseInt(interval[0]);
-                if (interval.length > 1)
-                    intervalType = IntervalType.type(interval[1]);
-                if (interval.length == 1 && partitionType == 
PartitionType.DATE)
-                    throw new UserDataException("The parameter 'INTERVAL' does 
not specify unit [:year|month|day]");
-            } else if (partitionType != PartitionType.ENUM)
-                throw new UserDataException("The parameter 'INTERVAL' must be 
specified along with 'PARTITION_BY'");
-            if (intervalNum < 1)
-                throw new UserDataException("The parameter 'INTERVAL' must > 
1, but actual is '" + intervalNum + "'");
-        } catch (IllegalArgumentException e1) {
-            throw new UserDataException("The parameter 'INTERVAL' invalid, the 
pattern is 'interval_num[:interval_unit]'");
-        }
+                String[] interval = intervalStr.split(":");
+                try {
+                    intervalNum = Long.parseLong(interval[0]);
+                    if (intervalNum < 1) {
+                        throw new UserDataException("The '<interval_num>' in 
parameter 'INTERVAL' must be at least 1, but actual is " + intervalNum);
+                    }
+                }
+                catch (NumberFormatException ex) {
+                    throw new UserDataException("The '<interval_num>' in 
parameter 'INTERVAL' must be an integer");
+                }
 
-        //parse any date values
-        try {
-            if (partitionType == PartitionType.DATE) {
-                SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
-                rangeStart = Calendar.getInstance();
-                rangeStart.setTime(df.parse(range[0]));
-                rangeEnd = Calendar.getInstance();
-                rangeEnd.setTime(df.parse(range[1]));
+                // Intervals of type DATE
+                if (interval.length > 1) {
+                    intervalType = IntervalType.typeOf(interval[1]);
+                }
+                if (interval.length == 1 && partitionType == 
PartitionType.DATE) {
+                    throw new UserDataException("The parameter 'INTERVAL' must 
specify unit (':year|month|day') for the PARTITION_TYPE = 'DATE'");
+                }
+            }
+            else if (partitionType != PartitionType.ENUM) {
+                throw new UserDataException("The parameter 'INTERVAL' must be 
specified along with 'PARTITION_BY' for this PARTITION_TYPE");
             }
-        } catch (ParseException e) {
-            throw new UserDataException("The parameter 'RANGE' has invalid 
date format. Expected format is 'YYYY-MM-DD'");
+        }
+        catch (IllegalArgumentException ex) {
+            throw new UserDataException("The parameter 'INTERVAL' is invalid. 
The pattern is '<interval_num>[:<interval_unit>]'");
         }
     }
 
     /**
-     * Returns statistics for Jdbc table. Currently it's not implemented.
      * @throws UnsupportedOperationException ANALYZE for Jdbc plugin is not 
supported
      */
     @Override
     public FragmentsStats getFragmentsStats() throws 
UnsupportedOperationException {
-        throw new UnsupportedOperationException("ANALYZE for Jdbc plugin is 
not supported");
+        throw new UnsupportedOperationException("ANALYZE for JDBC plugin is 
not supported");
     }
 
     /**
-     * Returns list of fragments containing all of the
-     * Jdbc table data.
+     * getFragments() implementation
      *
-     * @return a list of fragments
-     * @throws Exception if assign host error
+     * @return a list of fragments to be passed to PXF segments
      */
     @Override
-    public List<Fragment> getFragments() throws Exception {
+    public List<Fragment> getFragments() {
         if (partitionType == null) {
-            byte[] fragmentMetadata = null;
-            byte[] userData = null;
-            Fragment fragment = new Fragment(inputData.getDataSource(), null, 
fragmentMetadata, userData);
+            // No partition case
+            Fragment fragment = new Fragment(inputData.getDataSource(), 
pxfHosts, null);
             fragments.add(fragment);
-            return prepareHosts(fragments);
+            return fragments;
         }
+
         switch (partitionType) {
             case DATE: {
-                int currInterval = intervalNum;
+                Calendar fragStart = rangeDateStart;
 
-                Calendar fragStart = rangeStart;
-                while (fragStart.before(rangeEnd)) {
-                    Calendar fragEnd = (Calendar) fragStart.clone();
+                while (fragStart.before(rangeDateEnd)) {
+                    // Calculate a new fragment
+                    Calendar fragEnd = (Calendar)fragStart.clone();
                     switch (intervalType) {
                         case DAY:
-                            fragEnd.add(Calendar.DAY_OF_MONTH, currInterval);
+                            fragEnd.add(Calendar.DAY_OF_MONTH, 
(int)intervalNum);
                             break;
                         case MONTH:
-                            fragEnd.add(Calendar.MONTH, currInterval);
+                            fragEnd.add(Calendar.MONTH, (int)intervalNum);
                             break;
                         case YEAR:
-                            fragEnd.add(Calendar.YEAR, currInterval);
+                            fragEnd.add(Calendar.YEAR, (int)intervalNum);
                             break;
                     }
-                    if (fragEnd.after(rangeEnd))
-                        fragEnd = (Calendar) rangeEnd.clone();
+                    if (fragEnd.after(rangeDateEnd))
+                        fragEnd = (Calendar)rangeDateEnd.clone();
 
-                    //make metadata of this fragment , converts the date to a 
millisecond,then get bytes.
+                    // Convert to byte[]
                     byte[] msStart = 
ByteUtil.getBytes(fragStart.getTimeInMillis());
                     byte[] msEnd = 
ByteUtil.getBytes(fragEnd.getTimeInMillis());
                     byte[] fragmentMetadata = ByteUtil.mergeBytes(msStart, 
msEnd);
 
-                    byte[] userData = new byte[0];
-                    Fragment fragment = new 
Fragment(inputData.getDataSource(), null, fragmentMetadata, userData);
+                    // Write fragment
+                    Fragment fragment = new 
Fragment(inputData.getDataSource(), pxfHosts, fragmentMetadata);
                     fragments.add(fragment);
 
-                    //continue next fragment.
+                    // Prepare for the next fragment
                     fragStart = fragEnd;
                 }
                 break;
             }
             case INT: {
-                int rangeStart = Integer.parseInt(range[0]);
-                int rangeEnd = Integer.parseInt(range[1]);
-                int currInterval = intervalNum;
+                long fragStart = rangeIntStart;
 
-                //validate : curr_interval > 0
-                int fragStart = rangeStart;
-                while (fragStart < rangeEnd) {
-                    int fragEnd = fragStart + currInterval;
-                    if (fragEnd > rangeEnd) fragEnd = rangeEnd;
+                while (fragStart < rangeIntEnd) {
+                    // Calculate a new fragment
+                    long fragEnd = fragStart + intervalNum;
+                    if (fragEnd > rangeIntEnd) {
+                        fragEnd = rangeIntEnd;
+                    }
 
+                    // Convert to byte[]
                     byte[] bStart = ByteUtil.getBytes(fragStart);
                     byte[] bEnd = ByteUtil.getBytes(fragEnd);
                     byte[] fragmentMetadata = ByteUtil.mergeBytes(bStart, 
bEnd);
 
-                    byte[] userData = new byte[0];
-                    Fragment fragment = new 
Fragment(inputData.getDataSource(), null, fragmentMetadata, userData);
+                    // Write fragment
+                    Fragment fragment = new 
Fragment(inputData.getDataSource(), pxfHosts, fragmentMetadata);
                     fragments.add(fragment);
 
-                    //continue next fragment.
-                    fragStart = fragEnd;// + 1;
+                    // Prepare for the next fragment
+                    fragStart = fragEnd;
                 }
                 break;
             }
-            case ENUM:
+            case ENUM: {
                 for (String frag : range) {
                     byte[] fragmentMetadata = frag.getBytes();
-                    Fragment fragment = new 
Fragment(inputData.getDataSource(), null, fragmentMetadata, new byte[0]);
+                    Fragment fragment = new 
Fragment(inputData.getDataSource(), pxfHosts, fragmentMetadata);
                     fragments.add(fragment);
                 }
                 break;
+            }
         }
 
-        return prepareHosts(fragments);
+        return fragments;
     }
 
-    /**
-     * For each fragment , assigned a host address.
-     * In Jdbc Plugin, 'replicas' is the host address of the PXF engine that 
is running, not the database engine.
-     * Since the other PXF host addresses can not be probed, only the host 
name of the current PXF engine is returned.
-     * @param fragments a list of fragments
-     * @return a list of fragments that assigned hosts.
-     * @throws UnknownHostException if InetAddress.getLocalHost error.
-     */
-    public static List<Fragment> prepareHosts(List<Fragment> fragments) throws 
UnknownHostException {
-        for (Fragment fragment : fragments) {
-            String pxfHost = InetAddress.getLocalHost().getHostAddress();
-            String[] hosts = new String[]{pxfHost};
-            fragment.setReplicas(hosts);
-        }
+    // Partition parameters (filled by class constructor)
+    private String[] range = null;
+    private PartitionType partitionType = null;
+    private long intervalNum;
 
-        return fragments;
-    }
+    // Partition parameters for INT partitions (filled by class constructor)
+    private long rangeIntStart;
+    private long rangeIntEnd;
 
-    public String buildFragmenterSql(String dbName, String originSql) {
-        byte[] meta = inputData.getFragmentMetadata();
-        if (meta == null)
-            return originSql;
+    // Partition parameters for DATE partitions (filled by class constructor)
+    private IntervalType intervalType;
+    private Calendar rangeDateStart;
+    private Calendar rangeDateEnd;
 
-        DbProduct dbProduct = DbProduct.getDbProduct(dbName);
+    private static enum PartitionType {
+        DATE,
+        INT,
+        ENUM;
 
-        StringBuilder sb = new StringBuilder(originSql);
-        if (!originSql.contains("WHERE"))
-            sb.append(" WHERE 1=1 ");
+        public static PartitionType typeOf(String str) {
+            return valueOf(str.toUpperCase());
+        }
+    }
 
-        sb.append(" AND ");
-        switch (partitionType) {
-            case DATE: {
-                SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
-                //parse metadata of this fragment
-                //validate: the length of metadata == 16 (long)
-                byte[][] newb = ByteUtil.splitBytes(meta, 8);
-                Date fragStart = new Date(ByteUtil.toLong(newb[0]));
-                Date fragEnd = new Date(ByteUtil.toLong(newb[1]));
+    private static enum IntervalType {
+        DAY,
+        MONTH,
+        YEAR;
 
-                sb.append(partitionColumn).append(" >= 
").append(dbProduct.wrapDate(df.format(fragStart)));
-                sb.append(" AND ");
-                sb.append(partitionColumn).append(" < 
").append(dbProduct.wrapDate(df.format(fragEnd)));
+        public static IntervalType typeOf(String str) {
+            return valueOf(str.toUpperCase());
+        }
+    }
 
-                break;
-            }
-            case INT: {
-                //validate: the length of metadata == 8 (int)
-                byte[][] newb = ByteUtil.splitBytes(meta, 4);
-                int fragStart = ByteUtil.toInt(newb[0]);
-                int fragEnd = ByteUtil.toInt(newb[1]);
-                sb.append(partitionColumn).append(" >= ").append(fragStart);
-                sb.append(" AND ");
-                sb.append(partitionColumn).append(" < ").append(fragEnd);
-                break;
-            }
-            case ENUM:
-                sb.append(partitionColumn).append("='").append(new 
String(meta)).append("'");
-                break;
+    // A PXF engine to use as a host for fragments
+    private static final String[] pxfHosts;
+    static {
+        String[] localhost = {"localhost"};
+        try {
+            localhost[0] = InetAddress.getLocalHost().getHostAddress();
+        }
+        catch (UnknownHostException ex) {
+            // It is always possible to get 'localhost' address
         }
-        return sb.toString();
+        pxfHosts = localhost;
     }
 }

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPlugin.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPlugin.java 
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPlugin.java
index c0af405..6715508 100644
--- 
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPlugin.java
+++ 
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPlugin.java
@@ -19,95 +19,209 @@ package org.apache.hawq.pxf.plugins.jdbc;
  * under the License.
  */
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hawq.pxf.api.UserDataException;
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
 import org.apache.hawq.pxf.api.utilities.InputData;
 import org.apache.hawq.pxf.api.utilities.Plugin;
 
-import java.sql.*;
+import java.util.ArrayList;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.SQLTimeoutException;
+import java.sql.Statement;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
- * This class resolves the jdbc connection parameter and manages the opening 
and closing of the jdbc connection.
- * Implemented subclasses: {@link JdbcReadAccessor}.
+ * JDBC tables plugin (base class)
  *
+ * Implemented subclasses: {@link JdbcAccessor}, {@link JdbcResolver}.
  */
 public class JdbcPlugin extends Plugin {
-    private static final Log LOG = LogFactory.getLog(JdbcPlugin.class);
-
-    //jdbc connection parameters
-    protected String jdbcDriver = null;
-    protected String dbUrl = null;
-    protected String user = null;
-    protected String pass = null;
-    protected String tblName = null;
-    protected int batchSize = 100;
-
-    //jdbc connection
-    protected Connection dbConn = null;
-    //database type, from DatabaseMetaData.getDatabaseProductName()
-    protected String dbProduct = null;
-
     /**
-     * parse input data
+     * Class constructor
+     *
+     * @param input {@link InputData} provided by PXF
      *
-     * @param input the input data
-     * @throws UserDataException if the request parameter is malformed
+     * @throws UserDataException if one of the required request parameters is 
not set
      */
     public JdbcPlugin(InputData input) throws UserDataException {
         super(input);
+
         jdbcDriver = input.getUserProperty("JDBC_DRIVER");
+        if (jdbcDriver == null) {
+            throw new UserDataException("JDBC_DRIVER is a required parameter");
+        }
+
         dbUrl = input.getUserProperty("DB_URL");
-        user = input.getUserProperty("USER");
-        pass = input.getUserProperty("PASS");
-        String strBatch = input.getUserProperty("BATCH_SIZE");
-        if (strBatch != null) {
-            batchSize = Integer.parseInt(strBatch);
+        if (dbUrl == null) {
+            throw new UserDataException("DB_URL is a required parameter");
         }
 
-        if (jdbcDriver == null) {
-            throw new UserDataException("JDBC_DRIVER must be set");
+        tableName = input.getDataSource();
+        if (tableName == null) {
+            throw new UserDataException("Data source must be provided");
         }
-        if (dbUrl == null) {
-            throw new UserDataException("DB_URL must be set(read)");
+        /*
+        At the moment, when writing into some table, the table name is
+        concatenated with a special string that is necessary to write into 
HDFS.
+        However, a raw table name is necessary in case of JDBC.
+        The correct table name is extracted here.
+        */
+        Matcher matcher = tableNamePattern.matcher(tableName);
+        if (matcher.matches()) {
+            inputData.setDataSource(matcher.group(1));
+            tableName = input.getDataSource();
         }
 
-        tblName = input.getDataSource();
-        if (tblName == null) {
-            throw new UserDataException("TABLE_NAME must be set as 
DataSource.");
+        columns = inputData.getTupleDescription();
+        if (columns == null) {
+            throw new UserDataException("Tuple description must be provided");
+        }
+
+        // This parameter is not required. The default value is null
+        user = input.getUserProperty("USER");
+        if (user != null) {
+            pass = input.getUserProperty("PASS");
+        }
+
+        // This parameter is not required. The default value is 0
+        String batchSizeRaw = input.getUserProperty("BATCH_SIZE");
+        if (batchSizeRaw != null) {
+            try {
+                batchSize = Integer.parseInt(batchSizeRaw);
+                if (batchSize < 1) {
+                    throw new NumberFormatException();
+                } else if (batchSize == 0) {
+                    batchSize = 1;
+                }
+                batchSizeIsSetByUser = true;
+            }
+            catch (NumberFormatException e) {
+                throw new UserDataException("BATCH_SIZE is incorrect: must be 
a non-negative integer");
+            }
         }
-    }
 
-    public String getTableName() {
-        return tblName;
+        // This parameter is not required. The default value is 1
+        String poolSizeRaw = input.getUserProperty("POOL_SIZE");
+        if (poolSizeRaw != null) {
+            try {
+                poolSize = Integer.parseInt(poolSizeRaw);
+            }
+            catch (NumberFormatException e) {
+                throw new UserDataException("POOL_SIZE is incorrect: must be 
an integer");
+            }
+        }
     }
 
-    protected Connection openConnection() throws ClassNotFoundException, 
SQLException {
+    /**
+     * Open a new JDBC connection
+     *
+     * @throws ClassNotFoundException if the JDBC driver was not found
+     * @throws SQLException if a database access error occurs
+     * @throws SQLTimeoutException if a connection problem occurs
+     */
+    public Connection getConnection() throws ClassNotFoundException, 
SQLException, SQLTimeoutException {
+        Connection connection;
         if (LOG.isDebugEnabled()) {
-            LOG.debug(String.format("Open JDBC: 
driver=%s,url=%s,user=%s,pass=%s,table=%s",
-                    jdbcDriver, dbUrl, user, pass, tblName));
-        }
-        if (dbConn == null || dbConn.isClosed()) {
-            Class.forName(jdbcDriver);
             if (user != null) {
-                dbConn = DriverManager.getConnection(dbUrl, user, pass);
-            } else {
-                dbConn = DriverManager.getConnection(dbUrl);
+                LOG.debug(String.format("Open JDBC connection: driver=%s, 
url=%s, user=%s, pass=%s, table=%s",
+                    jdbcDriver, dbUrl, user, pass, tableName));
+            }
+            else {
+                LOG.debug(String.format("Open JDBC connection: driver=%s, 
url=%s, table=%s",
+                    jdbcDriver, dbUrl, tableName));
             }
-            DatabaseMetaData meta = dbConn.getMetaData();
-            dbProduct = meta.getDatabaseProductName();
         }
-        return dbConn;
+        Class.forName(jdbcDriver);
+        if (user != null) {
+            connection = DriverManager.getConnection(dbUrl, user, pass);
+        }
+        else {
+            connection = DriverManager.getConnection(dbUrl);
+        }
+        return connection;
     }
 
-    protected void closeConnection() {
+    /**
+     * Close a JDBC connection
+     */
+    public static void closeConnection(Connection connection) {
         try {
-            if (dbConn != null) {
-                dbConn.close();
-                dbConn = null;
+            if ((connection != null) && (!connection.isClosed())) {
+                if ((connection.getMetaData().supportsTransactions()) && 
(!connection.getAutoCommit())) {
+                    connection.commit();
+                }
+                connection.close();
             }
-        } catch (SQLException e) {
-            LOG.error("Close db connection error . ", e);
         }
+        catch (SQLException e) {
+            LOG.error("JDBC connection close error", e);
+        }
+    }
+
+    /**
+     * Prepare a JDBC PreparedStatement
+     *
+     * @throws ClassNotFoundException if the JDBC driver was not found
+     * @throws SQLException if a database access error occurs
+     * @throws SQLTimeoutException if a connection problem occurs
+     */
+    public PreparedStatement getPreparedStatement(Connection connection, 
String query) throws SQLException, SQLTimeoutException, ClassNotFoundException {
+        if ((connection == null) || (query == null)) {
+            throw new IllegalArgumentException("The provided query or 
connection is null");
+        }
+        if (connection.getMetaData().supportsTransactions()) {
+            connection.setAutoCommit(false);
+        }
+        return connection.prepareStatement(query);
     }
+
+    /**
+     * Close a JDBC Statement (and the connection it is based on)
+     */
+    public static void closeStatement(Statement statement) {
+        if (statement == null) {
+            return;
+        }
+        Connection connection = null;
+        try {
+            if (!statement.isClosed()) {
+                connection = statement.getConnection();
+                statement.close();
+            }
+        }
+        catch (Exception e) {}
+        closeConnection(connection);
+    }
+
+    // JDBC parameters
+    protected String jdbcDriver = null;
+    protected String dbUrl = null;
+    protected String user = null;
+    protected String pass = null;
+
+    protected String tableName = null;
+
+    // '100' is a recommended value: 
https://docs.oracle.com/cd/E11882_01/java.112/e16548/oraperf.htm#JJDBC28754
+    public static final int DEFAULT_BATCH_SIZE = 100;
+    // After argument parsing, this value is guaranteed to be >= 1
+    protected int batchSize = DEFAULT_BATCH_SIZE;
+    protected boolean batchSizeIsSetByUser = false;
+
+    protected int poolSize = 1;
+
+    // Columns description
+    protected ArrayList<ColumnDescriptor> columns = null;
+
+
+    private static final Log LOG = LogFactory.getLog(JdbcPlugin.class);
+
+    // At the moment, when writing into some table, the table name is 
concatenated with a special string that is necessary to write into HDFS. 
However, a raw table name is necessary in case of JDBC. This Pattern allows to 
extract the correct table name from the given InputData.dataSource
+    private static final Pattern tableNamePattern = 
Pattern.compile("/(.*)/[0-9]*-[0-9]*_[0-9]*");
 }

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadAccessor.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadAccessor.java
 
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadAccessor.java
deleted file mode 100644
index 2ca9a94..0000000
--- 
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadAccessor.java
+++ /dev/null
@@ -1,122 +0,0 @@
-package org.apache.hawq.pxf.plugins.jdbc;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.hawq.pxf.api.OneRow;
-import org.apache.hawq.pxf.api.ReadAccessor;
-import org.apache.hawq.pxf.api.UserDataException;
-import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
-import org.apache.hawq.pxf.api.utilities.InputData;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.sql.*;
-import java.util.ArrayList;
-
-/**
- * Accessor for Jdbc tables. The accessor will open and read a partition 
belonging
- * to a Jdbc table. JdbcReadAccessor generates and executes SQL from filter and
- * fragmented information, uses {@link JdbcReadResolver } to read the 
ResultSet, and generates
- * the data type - List {@link OneRow} that HAWQ needs.
- */
-public class JdbcReadAccessor extends JdbcPlugin implements ReadAccessor {
-    private static final Log LOG = LogFactory.getLog(JdbcReadAccessor.class);
-
-    WhereSQLBuilder filterBuilder = null;
-    private ColumnDescriptor keyColumn = null;
-
-    private String querySql = null;
-    private Statement statement = null;
-    private ResultSet resultSet = null;
-
-    public JdbcReadAccessor(InputData input) throws UserDataException {
-        super(input);
-        filterBuilder = new WhereSQLBuilder(inputData);
-
-        //buid select statement (not contain where statement)
-        ArrayList<ColumnDescriptor> columns = input.getTupleDescription();
-        StringBuilder sb = new StringBuilder();
-        sb.append("SELECT ");
-        for (int i = 0; i < columns.size(); i++) {
-            ColumnDescriptor column = columns.get(i);
-            if (column.isKeyColumn())
-                keyColumn = column;
-            if (i > 0) sb.append(",");
-            sb.append(column.columnName());
-        }
-        sb.append(" FROM ").append(getTableName());
-        querySql = sb.toString();
-    }
-
-    /**
-     * open db connection, execute query sql
-     */
-    @Override
-    public boolean openForRead() throws Exception {
-        if (statement != null && !statement.isClosed())
-            return true;
-        super.openConnection();
-
-        statement = dbConn.createStatement();
-
-        resultSet = executeQuery(querySql);
-
-        return true;
-    }
-
-    public ResultSet executeQuery(String sql) throws Exception {
-        String query = sql;
-        if (inputData.hasFilter()) {
-            //parse filter string , build where statement
-            String whereSql = filterBuilder.buildWhereSQL(dbProduct);
-
-            if (whereSql != null) {
-                query = query + " WHERE " + whereSql;
-            }
-        }
-
-        //according to the fragment information, rewriting sql
-        JdbcPartitionFragmenter fragmenter = new 
JdbcPartitionFragmenter(inputData);
-        query = fragmenter.buildFragmenterSql(dbProduct, query);
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("executeQuery: " + query);
-        }
-
-        return statement.executeQuery(query);
-    }
-
-    @Override
-    public OneRow readNextObject() throws Exception {
-        if (resultSet.next()) {
-            return new OneRow(null, resultSet);
-        }
-        return null;
-    }
-
-    @Override
-    public void closeForRead() throws Exception {
-        if (statement != null && !statement.isClosed()) {
-            statement.close();
-            statement = null;
-        }
-        super.closeConnection();
-    }
-}
\ No newline at end of file

Reply via email to