HAWQ-1605. Support INSERT in PXF JDBC plugin
(closes #1353)
Fix incorrect TIMESTAMP handling
PXF JDBC plugin update
* Add support for INSERT queries:
* The INSERT queries are processed by the same classes as the SELECT
queries;
* INSERTs are processed by the JDBC PreparedStatement;
* INSERTs support batching (by means of JDBC);
* Minor changes in WhereSQLBuilder and JdbcPartitionFragmenter:
* Removed 'WHERE 1=1';
* The same pattern of spaces around operators everywhere ('a = b', not
'a=b');
* JdbcPartitionFragmenter.buildFragmenterSql() made static to avoid
extra checks of InputData (proposed by @sansanichfb);
* Refactoring and some microoptimizations;
PXF JDBC refactoring
* The README.md is completely rewritten;
* Lots of changes in comments and javadoc comments;
* Code refactoring and minor changes in codestyle
Fixes proposed by @sansanichfb
Add DbProduct for Microsoft SQL Server
Notes on consistency in README and errors
* Add an explicit note on consistency of INSERT queries (it is not guaranteed).
* Change error message on INSERT failure
* Minor corrections of README
The fixes were proposed by @sansanichfb
Improve WhereSQLBuilder
* Add support of TIMESTAMP values;
* Add support of operations <>, LIKE, IS NULL, IS NOT NULL.
Fix proposed by @sansanichfb
Throw an exception when trying to open an already open connection when writing
to an external database using `openForWrite()`.
Although the behaviour is different in case of `openForRead()`, it does not
apply here. The second call to `openForWrite()` could be made from another
thread, and that would result in a race: the `PreparedStatement` we use to
write to an external database is the same object for all threads, and the
procedure `writeNextObject()` is not `synchronized` (or "protected" some other
way).
Simplify logging; BatchUpdateException
Simplify logging so that the logs produced by pxf-jdbc do not grow too big in
case DEBUG is enabled (the removed logging calls provide the field types and
names, and in most cases they are the same as in the data provided. The
exceptions are still being logged).
Add processing of BatchUpdateException, so that the real cause of an exception
is returned to the user.
PXF JDBC thread pool support
Implement support of multi-threaded processing of INSERT queries, using a
thread pool. To use the feature, set the parameter POOL_SIZE in the LOCATION
clause of an external table (<1: Pool size is equal to a number of CPUs
available to JVM; =1: Disable thread pool; >1: Use the given size of a pool.
Not all operations are processed by pool threads: pool threads only execute()
the queries, but they do not fill the PreparedStatement from OneRow.
Redesign connection pooling
* Redesign connection pooling: move OneRow objects processing to threads from
the pool. This decreases the load of a single-thread part of PXF;
* Introduce WriterCallable & related. This significantly simplifies the code of
JdbcAccessor and allows to introduce new methods of processing INSERT queries
with ease and enables fast hardcode tweaks for the same purpose.
* Add docs on thread pool feature
Support long values in PARTITION clause
Support values of Java primitive type 'long' in PARTITION clause (both for
RANGE and INTERVAL variables).
* Modify JdbcPartitionFragmenter (convert all int variables to long)
* Move parsing of INTERVAL values for PARTITION_TYPE "INT" to class constructor
(and add a parse exception handler)
* Simplify ByteUtil (remove methods to deal with values of type 'int')
* Update JdbcPartitionFragmenterTest
* Minor changes in comments
Fix pxf-profiles-default.xml
Remove ampersand from a description of JDBC profile from
pxf-profiles-default.xml
Remove explicit throws of IllegalArgumentException
Remove explicit references to 'IllegalArgumentException', as the caller is
probably unable to recover from them.
'IllegalStateException' is left unchanged, as it is thrown when the caller must
perform an action that will resolve the problem ('WriterCallable' is full).
Other runtime exceptions are explicitly listed in function definitions as
before; their causes are usually known to the caller, so it could do something
about them or at least send a more meaningful message about the error cause to
the user.
Proposed by Alex Denissov <[email protected]>
Simplify isCallRequired()
Make function 'isCallRequired()' body a one-line expression in all
implementations of 'WriterCallable'.
Proposed by Alex Denissov <[email protected]>
Remove rollback and change BATCH_SIZE logic
Remove calls to 'tryRollback()' and all processing of rollbacks in INSERT
queries.
The reason for the change is that rollback is effective for only one case:
INSERT is performed from one PXF segment that uses one thread to perform that
INSERT, and the external database supports transactions. In most cases, there
are more than one PXF segment that performs INSERT, and rollback is of no use
then.
On the other hand, rollback logic is cumbersome and notably increases code
complexity.
Due to the removal of rollback, there is no longer a need to keep BATCH_SIZE
infinite as often as possible (when BATCH_SIZE is infinite, the number of
scenarious of rollback() failing is lower (but this number is not zero)).
Thus, setting a recommended
(https://docs.oracle.com/cd/E11882_01/java.112/e16548/oraperf.htm#JJDBC28754)
value makes sense.
The old logic of infinite batch size also remains active.
Modify README.md: minor corrections, new BATCH_SIZE logic
Proposed by Alex Denissov <[email protected]>
Change BATCH_SIZE logic
* Modify BATCH_SIZE parameter processing according to new proposals
https://github.com/apache/hawq/pull/1353#discussion_r214413534
* Update README.md
* Restore fallback to non-batched INSERTs in case the external database (or
JDBC connector) does not support batch updates
Proposed by Alex Denissov <[email protected]>
Proposed by Dmitriy Pavlov <[email protected]>
Modify processing of BATCH_SIZE parameter
Modify BATCH_SIZE parameter processing according to the proposal
https://github.com/apache/hawq/pull/1353#discussion_r215023775:
* Update allowed values of BATCH_SIZE and their meanings
* Introduce explicit flag of presentness of a BATCH_SIZE parameter
* Introduce DEFAULT_BATCH_SIZE constant in JdbcPlugin
* Move processing of BATCH_SIZE values to JdbcAccessor
* Update README.md
Proposed by @divyabhargov, @denalex
Fix column type for columns converted to TEXT
Modify column type processing so that the column type is set correctly for
fields that:
* Are represented as columns of type TEXT by GPDBWritable, but whose actual
type is different
* Contain NULL value
Before, the column type code was not set correctly for such columns due to a
check of NULL field value.
Proposed and authored by @divyabhargov
removed parseUnsignedInt
Project: http://git-wip-us.apache.org/repos/asf/hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/hawq/commit/472fa2b7
Tree: http://git-wip-us.apache.org/repos/asf/hawq/tree/472fa2b7
Diff: http://git-wip-us.apache.org/repos/asf/hawq/diff/472fa2b7
Branch: refs/heads/master
Commit: 472fa2b74679a5d2b6fb08dd107b292a95e577b7
Parents: a741655
Author: Ivan Leskin <[email protected]>
Authored: Mon Mar 5 17:19:26 2018 +0300
Committer: Alexander Denissov <[email protected]>
Committed: Fri Sep 7 12:23:24 2018 -0700
----------------------------------------------------------------------
pxf/pxf-jdbc/README.md | 341 ++++++++++------
.../hawq/pxf/plugins/jdbc/JdbcAccessor.java | 353 +++++++++++++++++
.../pxf/plugins/jdbc/JdbcFilterBuilder.java | 75 ++--
.../plugins/jdbc/JdbcPartitionFragmenter.java | 391 ++++++++++---------
.../hawq/pxf/plugins/jdbc/JdbcPlugin.java | 226 ++++++++---
.../hawq/pxf/plugins/jdbc/JdbcReadAccessor.java | 122 ------
.../hawq/pxf/plugins/jdbc/JdbcReadResolver.java | 103 -----
.../hawq/pxf/plugins/jdbc/JdbcResolver.java | 367 +++++++++++++++++
.../hawq/pxf/plugins/jdbc/WhereSQLBuilder.java | 162 +++++---
.../hawq/pxf/plugins/jdbc/utils/ByteUtil.java | 38 +-
.../hawq/pxf/plugins/jdbc/utils/DbProduct.java | 45 ++-
.../plugins/jdbc/utils/MicrosoftProduct.java | 35 ++
.../pxf/plugins/jdbc/utils/MysqlProduct.java | 10 +-
.../pxf/plugins/jdbc/utils/OracleProduct.java | 11 +-
.../pxf/plugins/jdbc/utils/PostgresProduct.java | 11 +-
.../writercallable/BatchWriterCallable.java | 109 ++++++
.../writercallable/SimpleWriterCallable.java | 102 +++++
.../jdbc/writercallable/WriterCallable.java | 56 +++
.../writercallable/WriterCallableFactory.java | 97 +++++
.../jdbc/JdbcPartitionFragmenterTest.java | 16 +-
.../hawq/pxf/plugins/jdbc/SqlBuilderTest.java | 54 +--
.../src/main/resources/pxf-profiles-default.xml | 6 +-
22 files changed, 1984 insertions(+), 746 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/README.md
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/README.md b/pxf/pxf-jdbc/README.md
index c158f32..fde1641 100644
--- a/pxf/pxf-jdbc/README.md
+++ b/pxf/pxf-jdbc/README.md
@@ -1,139 +1,254 @@
-# Accessing Jdbc Table Data
+# PXF JDBC plugin
-The PXF JDBC plug-in reads data stored in Traditional relational database,ie :
mysql,ORACLE,postgresql.
+The PXF JDBC plugin allows to access external databases that implement [the
Java Database Connectivity
API](http://www.oracle.com/technetwork/java/javase/jdbc/index.html). Both read
(SELECT) and write (INSERT) operations are supported by the plugin.
-PXF-JDBC plug-in is the client of the database, the host running the database
engine does not need to
-deploy PXF.
+PXF JDBC plugin is a JDBC client. The host running the external database does
not need to deploy PXF.
-# Prerequisites
+## Prerequisites
-Check the following before using PXF to access JDBC Table:
-* The PXF JDBC plug-in is installed on all cluster nodes.
-* The JDBC JAR files are installed on all cluster nodes, and added to file -
'pxf-public.classpath'
-* You have tested PXF on HDFS.
+Check the following before using the PXF JDBC plugin:
-# Using PXF Tables to Query JDBC Table
-Jdbc tables are defined in same schema in PXF.The PXF table has the same
column name
-as Jdbc Table, and the column type requires a mapping of Jdbc-HAWQ.
+* The PXF JDBC plugin is installed on all PXF nodes;
+* The JDBC driver for external database is installed on all PXF nodes;
+* All PXF nodes are allowed to connect to the external database.
-## Syntax Example
-The following PXF table definition is valid for Jdbc Table.
- CREATE [READABLE|WRITABLE] EXTERNAL TABLE table_name
- ( column_name data_type [, ...] | LIKE other_table )
- LOCATION
('pxf://namenode[:port]/jdbc-schema-name.jdbc-table-name?<pxf-parameters><&custom-parameters>')
- FORMAT 'CUSTOM' (formatter='pxfwritable_import')
-If `jdbc-schema-name` is omitted, pxf will default to the `default` schema.
+## Limitations
-The `column_name` must exists in jdbc-table,`data_type` equals or similar to
-the jdbc-column type.
+Both **PXF table** **and** a **table in external database** **must have the
same definiton**. Their columns must have the same names, and the columns'
types must correspond.
-where `<pxf-parameters>` is:
+**Not all data types are supported** by the plugin. The following PXF data
types are supported:
- [FRAGMENTER=org.apache.hawq.pxf.plugins.jdbc.JdbcPartitionFragmenter
- &ACCESSOR=org.apache.hawq.pxf.plugins.jdbc.JdbcReadAccessor
- &RESOLVER=org.apache.hawq.pxf.plugins.jdbc.JdbcReadResolver]
- | PROFILE=Jdbc
+* `INTEGER`, `BIGINT`, `SMALLINT`
+* `REAL`, `FLOAT8`
+* `NUMERIC`
+* `BOOLEAN`
+* `VARCHAR`, `BPCHAR`, `TEXT`
+* `DATE`
+* `TIMESTAMP`
+* `BYTEA`
-where `<custom-parameters>` is:
+The `<full_external_table_name>` (see below) **must not match** the
[pattern](https://docs.oracle.com/javase/7/docs/api/java/util/regex/Pattern.html)
`/.*/[0-9]*-[0-9]*_[0-9]*` (the name must not start with `/` and have an
ending that consists of `/` and three groups of numbers of arbitrary length,
the first two separated by `-` and the last two separated by `_`. For example,
the following table name is not allowed: `/public.table/1-2_3`).
- JDBC_DRIVER=<jdbc-driver-class-name>
- &DB_URL=<jdbc-url>&USER=<database-user>&PASS=<password>
+At the moment, one PXF external table cannot serve both SELECT and INSERT
queries. A separate PXF external table is required for each type of queries.
-## Jdbc Table to HAWQ Data Type Mapping
-Jdbc-table and hawq-table data type system is similar to, does not require
-a special type of mapping.
-# Usage
-The following to mysql, for example, describes the use of PDF-JDBC.
-To query MySQL Table in HAWQ, perform the following steps:
-1. create Table in MySQL
+## Syntax
+```
+CREATE [ READABLE | WRITABLE ] EXTERNAL TABLE <table_name> (
+ { <column_name> <data_type> [, ...] | LIKE <other_table> }
+)
+LOCATION (
+
'pxf://<full_external_table_name>?<pxf_parameters><jdbc_required_parameters><jdbc_login_parameters><plugin_parameters>'
+)
+FORMAT 'CUSTOM' (FORMATTER={'pxfwritable_import' | 'pxfwritable_export'})
+```
- mysql> use demodb;
- mysql> create table myclass(
- id int(4) not null primary key,
- name varchar(20) not null,
- gender int(4) not null default '0',
- degree double(16,2));`
-2. insert test data
+The **`<pxf_parameters>`** are:
+```
+{
+PROFILE=JDBC
+|
+FRAGMENTER=org.apache.hawq.pxf.plugins.jdbc.JdbcPartitionFragmenter
+&ACCESSOR=org.apache.hawq.pxf.plugins.jdbc.JdbcAccessor
+&RESOLVER=org.apache.hawq.pxf.plugins.jdbc.JdbcResolver
+}
+```
- insert into myclass values(1,"tom",1,90);
- insert into myclass values(2,'john',0,94);
- insert into myclass values(3,'simon',1,79);
-3. copy mysql-jdbc jar files to `/usr/lib/pxf` (on all cluster nodes), and
-edit `/etc/pxf/conf/pxf-public.classpath` , add :
+The **`<jdbc_required_parameters>`** are:
+```
+&JDBC_DRIVER=<external_database_jdbc_driver>
+&DB_URL=<external_database_url>
+```
- /usr/lib/pxf/mysql-connector-java-*.jar
+The **`<jdbc_login_parameters>`** are **optional**, but if provided, both of
them must be present:
+```
+&USER=<external_database_login>
+&PASS=<external_database_password>
+```
- Restart all pxf-engine.
+The **`<plugin_parameters>`** are **optional**:
-4. create Table in HAWQ:
+```
+[
+&BATCH_SIZE=<batch_size>
+]
+[
+&POOL_SIZE=<pool_size>
+]
+[
+&PARTITION_BY=<column>:<column_type>
+&RANGE=<start_value>:<end_value>
+[&INTERVAL=<value>[:<unit>]]
+]
+```
- gpadmin=# CREATE EXTERNAL TABLE myclass(id integer,
- name text,
- gender integer,
- degree float8)
- LOCATION ('pxf://localhost:51200/demodb.myclass'
- '?PROFILE=JDBC'
- '&JDBC_DRIVER=com.mysql.jdbc.Driver'
-
'&DB_URL=jdbc:mysql://192.168.200.6:3306/demodb&USER=root&PASS=root'
- )
- FORMAT 'CUSTOM' (Formatter='pxfwritable_import');
+The meaning of `BATCH_SIZE` is given in section [batching of INSERT
queries](#Batching).
-MySQL instance IP: 192.168.200.6, port: 3306.
+The meaning of `POOL_SIZE` is given in section [using thread pool for INSERT
queries](#Thread_pool)
-5. query mysql data in HAWQ:
+The meaning of other parameters is given in section
[partitioning](#Partitioning).
- gpadmin=# select * from myclass;
- gpadmin=# select * from myclass where id=2;
-# Jdbc Table Fragments
-## intro
-PXF-JDBC plug-in as a client to access jdbc database.By default, there is
-only one pxf-instance connectied JDBC Table.If the jdbc table data is large,
-you can also use multiple pxf-instance to access the JDBC table by fragments.
+## SELECT queries
-## Syntax
-where `<custom-parameters>` can use following partition parameters:
-
-
PARTITION_BY=column_name:column_type&RANGE=start_value[:end_value]&INTERVAL=interval_num[:interval_unit]
-The `PARTITION_BY` parameter indicates which column to use as the partition
column.
-It can be split by colon(':'),the `column_type` current supported :
`date|int|enum` .
-The Date format is `yyyy-MM-dd`.
-The `PARTITION_BY` parameter can be null, and there will be only one fragment.
-
-The `RANGE` parameter indicates the range of data to be queried , it can be
split by colon(':').
- The range is left-closed, ie: `>= start_value AND < end_value` .
-
-The `INTERVAL` parameter can be split by colon(':'), indicate the interval
- value of one fragment. When `column_type` is `date`,this parameter must
- be split by colon, and `interval_unit` can be `year|month|day`. When
- `column_type` is int, the `interval_unit` can be empty. When `column_type`
- is enum,the `INTERVAL` parameter can be empty.
-
-The syntax examples is :
-
- *
PARTITION_BY=createdate:date&RANGE=2008-01-01:2010-01-01&INTERVAL=1:month'
- * PARTITION_BY=year:int&RANGE=2008:2010&INTERVAL=1
- * PARTITION_BY=grade:enum&RANGE=excellent:good:general:bad
-
-## Usage
-MySQL Table:
-
- CREATE TABLE sales (id int primary key, cdate date, amt
decimal(10,2),grade varchar(30))
-HAWQ Table:
-
- CREATE EXTERNAL TABLE sales(id integer,
- cdate date,
- amt float8,
- grade text)
- LOCATION ('pxf://localhost:51200/sales'
- '?PROFILE=JDBC'
- '&JDBC_DRIVER=com.mysql.jdbc.Driver'
-
'&DB_URL=jdbc:mysql://192.168.200.6:3306/demodb&USER=root&PASS=root'
-
'&PARTITION_BY=cdate:date&RANGE=2008-01-01:2010-01-01&INTERVAL=1:year'
- )
- FORMAT 'CUSTOM' (Formatter='pxfwritable_import');
-At PXF-JDBC plugin,this will generate 2 fragments.Then HAWQ assign these
fragments to 2 PXF-instance
-to access jdbc table data.
\ No newline at end of file
+The PXF JDBC plugin allows to perform SELECT queries to external tables.
+
+To perform SELECT queries, create an `EXTERNAL READABLE TABLE` or just an
`EXTERNAL TABLE` with `FORMAT 'CUSTOM' (FORMATTER='pxfwritable_import')` in PXF.
+
+The `BATCH_SIZE` parameter is not used in such tables. *However*, if this
parameter is present, its value will be checked for correctness (it must be an
integer).
+
+
+## INSERT queries
+
+The PXF JDBC plugin allows to perform INSERT queries to external tables. Note
that **the plugin does not guarantee consistency for INSERT queries**. Use a
staging table in external database to deal with this.
+
+To perform INSERT queries, create an `EXTERNAL WRITABLE TABLE` with `FORMAT
'CUSTOM' (FORMATTER='pxfwritable_export')` in PXF.
+
+The `PARTITION_BY`, `RANGE` and `INTERVAL` parameters in such tables are
ignored.
+
+
+### Batching
+
+INSERT queries can be batched. This may significantly increase perfomance if
batching is supported by an external database.
+
+Batching is enabled by default, and the default batch size is `100` (this is a
[recommended](https://docs.oracle.com/cd/E11882_01/java.112/e16548/oraperf.htm#JJDBC28754)
value). To control this feature, create an external table with the parameter
`BATCH_SIZE` set to:
+* `0` or `1`. Batching will be disabled;
+* `integer > 1`. Batch will be of given size.
+
+Batching must be supported by the JDBC driver of an external database. If the
driver does not support batching, behaviour depends on the `BATCH_SIZE`
parameter:
+* `BATCH_SIZE` is not present; `BATCH_SIZE` is `0` or `1`. PXF will try to
execute INSERT query without batching;
+* `BATCH_SIZE` is an `integer > 1`. INSERT query will fail with an appropriate
error message.
+
+
+## Thread pool
+
+The INSERT queries can be processed by multiple threads. This may
significantly increase perfomance if the external database can work with
multiple connections simultaneously.
+
+It is recommended to use batching together with a thread pool. In this case,
each thread receives data from one (whole) batch and processes it. If a thread
pool is used without batching, each thread in a pool will receive exactly one
tuple; as a rule, this takes much more time than an usual one-thread INSERT.
+
+If any of the threads from pool fails, the user will get the error message.
However, if INSERT fails, some data still may be INSERTed into the external
database.
+
+To enable thread pool, create an external table with the paramete `POOL_SIZE`
set to:
+* `integer < 1`. The number of threads in a pool will be equal to the number
of CPUs in the system;
+* `integer > 1`. Thread pool will consist of the given number of threads;
+* `1`. Thread pool will be disabled.
+
+By default (`POOL_SIZE` is absent), thread pool is not used.
+
+
+## Partitioning
+
+PXF JDBC plugin supports simultaneous read access to an external table from
multiple PXF segments. This feature is called partitioning.
+
+
+### Syntax
+
+Use the following `<plugin_parameters>` (mentioned above) to activate
partitioning:
+
+```
+&PARTITION_BY=<column>:<column_type>
+&RANGE=<start_value>:<end_value>
+[&INTERVAL=<value>[:<unit>]]
+```
+
+* The `PARTITION_BY` parameter indicates which column to use as a partition
column. Only one column can be used as the partition column
+ * The `<column>` is the name of a partition column;
+ * The `<column_type>` is the data type of a partition column. At the
moment, the **supported types** are `INT`, `DATE` and `ENUM`.
+
+* The `RANGE` parameter indicates the range of data to be queried.
+ * If the partition type is `ENUM`, the `RANGE` parameter must be a list of
values, each of which forms its own fragment;
+ * If the partition type is `INT` or `DATE`, the `RANGE` parameter must be
a finite left-closed range ( `... >= start_value AND ... < end_value`);
+ * For `DATE` partitions, the date format must be `yyyy-MM-dd`.
+
+* The `INTERVAL` parameter is **required** for `INT` and `DATE` partitions. It
is ignored if `<column_type>` is `ENUM`.
+ * The `<value>` is the size of each fragment (the last one may be made
smaller by the plugin);
+ * The `<unit>` **must** be provided if `<column_type>` is `DATE`. `year`,
`month` and `day` are supported. This parameter is ignored in case of any other
`<column_type>`.
+
+Example partitions:
+* `&PARTITION_BY=id:int&RANGE=42:142&INTERVAL=2`
+* `&PARTITION_BY=createdate:date&RANGE=2008-01-01:2010-01-01&INTERVAL=1:month`
+* `&PARTITION_BY=grade:enum&RANGE=excellent:good:general:bad`
+
+
+### Mechanism
+
+If partitioning is activated, the SELECT query is split into a set of small
queries, each of which is called a *fragment*. All the fragments are processed
by separate PXF instances simultaneously. If there are more fragments than PXF
instances, some instances will process more than one fragment; if only one PXF
instance is available, it will process all the fragments.
+
+Extra query constraints (`WHERE` expressions) are automatically added to each
fragment to guarantee that every tuple of data is retrieved from the external
database exactly once.
+
+
+### Partitioning example
+Consider the following MySQL table:
+```
+CREATE TABLE sales (
+ id int primary key,
+ cdate date,
+ amt decimal(10,2),
+ grade varchar(30)
+)
+```
+and the following HAWQ table:
+```
+CREATE EXTERNAL TABLE sales(
+ id integer,
+ cdate date,
+ amt float8,
+ grade text
+)
+LOCATION
('pxf://sales?PROFILE=JDBC&JDBC_DRIVER=com.mysql.jdbc.Driver&DB_URL=jdbc:mysql://192.168.200.6:3306/demodb&PARTITION_BY=cdate:date&RANGE=2008-01-01:2010-01-01&INTERVAL=1:year')
+FORMAT 'CUSTOM' (FORMATTER='pxfwritable_import');
+```
+
+The PXF JDBC plugin will generate two fragments for a query `SELECT * FROM
sales`. Then HAWQ will assign each of them to a separate PXF segment. Each
segment will perform the SELECT query, and the first one will get tuples with
`cdate` values for year `2008`, while the second will get tuples for year
`2009`. Then each PXF segment will send its results back to HAWQ, where they
will be "concatenated" and returned.
+
+
+## Examples
+
+The following example shows how to access a MySQL table via JDBC.
+
+Suppose MySQL instance is available at `192.168.200.6:3306`. A table in MySQL
is created:
+```
+use demodb;
+create table myclass(
+ id int(4) not null primary key,
+ name varchar(20) not null,
+ degree double(16,2)
+);
+```
+
+Then some data is inserted into MySQL table:
+```
+insert into myclass values(1, 'tom', 90);
+insert into myclass values(2, 'john', 94);
+insert into myclass values(3, 'simon', 79);
+```
+
+The MySQL JDBC driver files (JAR) are copied to `/usr/lib/pxf` on all cluster
nodes and a line is added to `/etc/pxf/conf/pxf-public.classpath`:
+```
+/usr/lib/pxf/mysql-connector-java-*.jar
+```
+
+After this, all PXF segments are restarted.
+
+Then a table in HAWQ is created:
+```
+CREATE EXTERNAL TABLE myclass(
+ id integer,
+ name text,
+ degree float8
+)
+LOCATION (
+
'pxf://localhost:51200/demodb.myclass?PROFILE=JDBC&JDBC_DRIVER=com.mysql.jdbc.Driver&DB_URL=jdbc:mysql://192.168.200.6:3306/demodb&USER=root&PASS=root'
+)
+FORMAT 'CUSTOM' (
+ FORMATTER='pxfwritable_import'
+);
+```
+
+Finally, a query to a HAWQ external table is made:
+```
+SELECT * FROM myclass;
+SELECT id, name FROM myclass WHERE id = 2;
+```
http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcAccessor.java
----------------------------------------------------------------------
diff --git
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcAccessor.java
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcAccessor.java
new file mode 100644
index 0000000..2cacafd
--- /dev/null
+++
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcAccessor.java
@@ -0,0 +1,353 @@
+package org.apache.hawq.pxf.plugins.jdbc;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.WriteAccessor;
+import org.apache.hawq.pxf.api.UserDataException;
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.jdbc.writercallable.WriterCallable;
+import org.apache.hawq.pxf.plugins.jdbc.writercallable.WriterCallableFactory;
+
+import java.util.List;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.sql.Statement;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLTimeoutException;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * JDBC tables accessor
+ *
+ * The SELECT queries are processed by {@link java.sql.Statement}
+ *
+ * The INSERT queries are processed by {@link java.sql.PreparedStatement} and
+ * built-in JDBC batches of arbitrary size
+ */
+public class JdbcAccessor extends JdbcPlugin implements ReadAccessor,
WriteAccessor {
+ /**
+ * Class constructor
+ */
+ public JdbcAccessor(InputData inputData) throws UserDataException {
+ super(inputData);
+ }
+
+ /**
+ * openForRead() implementation
+ * Create query, open JDBC connection, execute query and store the result
into resultSet
+ *
+ * @throws SQLException if a database access error occurs
+ * @throws SQLTimeoutException if a problem with the connection occurs
+ * @throws ParseException if th SQL statement provided in PXF InputData is
incorrect
+ * @throws ClassNotFoundException if the JDBC driver was not found
+ */
+ @Override
+ public boolean openForRead() throws SQLException, SQLTimeoutException,
ParseException, ClassNotFoundException {
+ if (statementRead != null && !statementRead.isClosed()) {
+ return true;
+ }
+
+ Connection connection = super.getConnection();
+
+ queryRead = buildSelectQuery(connection.getMetaData());
+ statementRead = connection.createStatement();
+ resultSetRead = statementRead.executeQuery(queryRead);
+
+ return true;
+ }
+
+ /**
+ * readNextObject() implementation
+ * Retreive the next tuple from resultSet and return it
+ *
+ * @throws SQLException if a problem in resultSet occurs
+ */
+ @Override
+ public OneRow readNextObject() throws SQLException {
+ if (resultSetRead.next()) {
+ return new OneRow(resultSetRead);
+ }
+ return null;
+ }
+
+ /**
+ * closeForRead() implementation
+ */
+ @Override
+ public void closeForRead() {
+ JdbcPlugin.closeStatement(statementRead);
+ }
+
+ /**
+ * openForWrite() implementation
+ * Create query template and open JDBC connection
+ *
+ * @throws SQLException if a database access error occurs
+ * @throws SQLTimeoutException if a problem with the connection occurs
+ * @throws ParseException if the SQL statement provided in PXF InputData
is incorrect
+ * @throws ClassNotFoundException if the JDBC driver was not found
+ */
+ @Override
+ public boolean openForWrite() throws SQLException, SQLTimeoutException,
ParseException, ClassNotFoundException {
+ if (statementWrite != null && !statementWrite.isClosed()) {
+ throw new SQLException("The connection to an external database is
already open.");
+ }
+
+ Connection connection = super.getConnection();
+
+ queryWrite = buildInsertQuery();
+ statementWrite = super.getPreparedStatement(connection, queryWrite);
+
+ // Process batchSize
+ if (!connection.getMetaData().supportsBatchUpdates()) {
+ if ((batchSizeIsSetByUser) && (batchSize > 1)) {
+ throw new SQLException("The external database does not support
batch updates");
+ }
+ else {
+ batchSize = 1;
+ }
+ }
+
+ // Process poolSize
+ if (poolSize < 1) {
+ poolSize = Runtime.getRuntime().availableProcessors();
+ LOG.info(
+ "The POOL_SIZE is set to the number of CPUs available (" +
Integer.toString(poolSize) + ")"
+ );
+ }
+ if (poolSize > 1) {
+ executorServiceWrite = Executors.newFixedThreadPool(poolSize);
+ poolTasks = new LinkedList<>();
+ }
+
+ // Setup WriterCallableFactory
+ writerCallableFactory = new WriterCallableFactory();
+ writerCallableFactory.setPlugin(this);
+ writerCallableFactory.setQuery(queryWrite);
+ writerCallableFactory.setBatchSize(batchSize);
+ if (poolSize == 1) {
+ writerCallableFactory.setStatement(statementWrite);
+ }
+
+ writerCallable = writerCallableFactory.get();
+
+ return true;
+ }
+
+ /**
+ * writeNextObject() implementation
+ *
+ * If batchSize is not 0 or 1, add a tuple to the batch of statementWrite
+ * Otherwise, execute an INSERT query immediately
+ *
+ * In both cases, a {@link java.sql.PreparedStatement} is used
+ *
+ * @throws SQLException if a database access error occurs
+ * @throws IOException if the data provided by {@link JdbcResolver} is
corrupted
+ * @throws ClassNotFoundException if pooling is used and the JDBC driver
was not found
+ * @throws IllegalStateException if writerCallableFactory was not properly
initialized
+ * @throws Exception if it happens in writerCallable.call()
+ */
+ @Override
+ public boolean writeNextObject(OneRow row) throws Exception {
+ if (writerCallable == null) {
+ throw new IllegalStateException("The JDBC connection was not
properly initialized (writerCallable is null)");
+ }
+
+ writerCallable.supply(row);
+ if (writerCallable.isCallRequired()) {
+ if (poolSize > 1) {
+ // Pooling is used. Create new writerCallable
+ poolTasks.add(executorServiceWrite.submit(writerCallable));
+ writerCallable = writerCallableFactory.get();
+ }
+ else {
+ // Pooling is not used
+ writerCallable.call();
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * closeForWrite() implementation
+ *
+ * @throws SQLException if a database access error occurs
+ * @throws Exception if it happens in writerCallable.call() or due to
runtime errors in thread pool
+ */
+ @Override
+ public void closeForWrite() throws Exception {
+ if ((statementWrite == null) || (writerCallable == null)) {
+ return;
+ }
+
+ try {
+ if (poolSize > 1) {
+ // Process thread pool
+ Exception firstException = null;
+ for (Future<SQLException> task : poolTasks) {
+ // We need this construction to ensure that we try to
close all connections opened by pool threads
+ try {
+ SQLException currentSqlException = task.get();
+ if (currentSqlException != null) {
+ if (firstException == null) {
+ firstException = currentSqlException;
+ }
+ LOG.error(
+ "A SQLException in a pool thread occured: " +
currentSqlException.getClass() + " " + currentSqlException.getMessage()
+ );
+ }
+ }
+ catch (Exception e) {
+ // This exception must have been caused by some thread
execution error. However, there may be other exception (maybe of class
SQLException) that happened in one of threads that were not examined yet. That
is why we do not modify firstException
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "A runtime exception in a thread pool occured:
" + e.getClass() + " " + e.getMessage()
+ );
+ }
+ }
+ }
+ try {
+ executorServiceWrite.shutdown();
+ executorServiceWrite.shutdownNow();
+ }
+ catch (Exception e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("executorServiceWrite.shutdown() or
.shutdownNow() threw an exception: " + e.getClass() + " " + e.getMessage());
+ }
+ }
+ if (firstException != null) {
+ throw firstException;
+ }
+ }
+
+ // Send data that is left
+ writerCallable.call();
+ }
+ finally {
+ JdbcPlugin.closeStatement(statementWrite);
+ }
+ }
+
+
+ /**
+ * Build SELECT query (with "WHERE" and partition constraints)
+ *
+ * @return Complete SQL query
+ *
+ * @throws ParseException if the constraints passed in InputData are
incorrect
+ * @throws SQLException if the database metadata is invalid
+ */
+ private String buildSelectQuery(DatabaseMetaData databaseMetaData) throws
ParseException, SQLException {
+ if (databaseMetaData == null) {
+ throw new IllegalArgumentException("The provided databaseMetaData
is null");
+ }
+ StringBuilder sb = new StringBuilder();
+ sb.append("SELECT ");
+
+ // Insert columns' names
+ String columnDivisor = "";
+ for (ColumnDescriptor column : columns) {
+ sb.append(columnDivisor);
+ columnDivisor = ", ";
+ sb.append(column.columnName());
+ }
+
+ // Insert the table name
+ sb.append(" FROM ").append(tableName);
+
+ // Insert regular WHERE constraints
+ (new
WhereSQLBuilder(inputData)).buildWhereSQL(databaseMetaData.getDatabaseProductName(),
sb);
+
+ // Insert partition constraints
+ JdbcPartitionFragmenter.buildFragmenterSql(inputData,
databaseMetaData.getDatabaseProductName(), sb);
+
+ return sb.toString();
+ }
+
+ /**
+ * Build INSERT query template (field values are replaced by placeholders
'?')
+ *
+ * @return SQL query with placeholders instead of actual values
+ */
+ private String buildInsertQuery() {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("INSERT INTO ");
+
+ // Insert the table name
+ sb.append(tableName);
+
+ // Insert columns' names
+ sb.append("(");
+ String fieldDivisor = "";
+ for (ColumnDescriptor column : columns) {
+ sb.append(fieldDivisor);
+ fieldDivisor = ", ";
+ sb.append(column.columnName());
+ }
+ sb.append(")");
+
+ sb.append(" VALUES ");
+
+ // Insert values placeholders
+ sb.append("(");
+ fieldDivisor = "";
+ for (int i = 0; i < columns.size(); i++) {
+ sb.append(fieldDivisor);
+ fieldDivisor = ", ";
+ sb.append("?");
+ }
+ sb.append(")");
+
+ return sb.toString();
+ }
+
+ // Read variables
+ private String queryRead = null;
+ private Statement statementRead = null;
+ private ResultSet resultSetRead = null;
+
+ // Write variables
+ private String queryWrite = null;
+ private PreparedStatement statementWrite = null;
+ private WriterCallableFactory writerCallableFactory = null;
+ private WriterCallable writerCallable = null;
+ private ExecutorService executorServiceWrite = null;
+ private List<Future<SQLException> > poolTasks = null;
+
+ // Static variables
+ private static final Log LOG = LogFactory.getLog(JdbcAccessor.class);
+}
http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcFilterBuilder.java
----------------------------------------------------------------------
diff --git
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcFilterBuilder.java
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcFilterBuilder.java
index 3c56ccb..ad331ed 100644
---
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcFilterBuilder.java
+++
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcFilterBuilder.java
@@ -26,14 +26,12 @@ import org.apache.hawq.pxf.api.LogicalFilter;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
+import java.text.ParseException;
/**
- * Uses the filter parser code to build a filter object, either simple - a
- * single {@link BasicFilter} object or a
- * compound - a {@link List} of
- * {@link BasicFilter} objects.
- * The subclass {@link WhereSQLBuilder} will use the filter for
- * generate WHERE statement.
+ * A filter builder. Uses a single {@link BasicFilter} or a {@link List} of
{@link BasicFilter} objects.
+ *
+ * The subclass {@link WhereSQLBuilder} will use the result to generate WHERE
statement.
*/
public class JdbcFilterBuilder implements FilterParser.FilterBuilder {
/**
@@ -41,26 +39,28 @@ public class JdbcFilterBuilder implements
FilterParser.FilterBuilder {
* list of such filters.
*
* @param filterString the string representation of the filter
- * @return a single {@link BasicFilter}
- * object or a {@link List} of
- * {@link BasicFilter} objects.
- * @throws Exception if parsing the filter failed or filter is not a basic
- * filter or list of basic filters
+ * @return a {@link BasicFilter} or a {@link List} of {@link BasicFilter}.
+ * @throws ParseException if parsing the filter failed or filter is not a
basic filter or list of basic filters
*/
- public Object getFilterObject(String filterString) throws Exception {
- FilterParser parser = new FilterParser(this);
- Object result =
parser.parse(filterString.getBytes(FilterParser.DEFAULT_CHARSET));
-
- if (!(result instanceof LogicalFilter) && !(result instanceof
BasicFilter)
- && !(result instanceof List)) {
- throw new Exception("String " + filterString
- + " resolved to no filter");
+ public Object getFilterObject(String filterString) throws ParseException {
+ try {
+ FilterParser parser = new FilterParser(this);
+ Object result =
parser.parse(filterString.getBytes(FilterParser.DEFAULT_CHARSET));
+
+ if (
+ !(result instanceof LogicalFilter) &&
+ !(result instanceof BasicFilter) &&
+ !(result instanceof List)
+ ) {
+ throw new Exception("'" + filterString + "' could not be
resolved to a filter");
+ }
+ return result;
+ }
+ catch (Exception e) {
+ throw new ParseException(e.getMessage(), 0);
}
-
- return result;
}
-
@Override
public Object build(FilterParser.LogicalOperation op, Object leftOperand,
Object rightOperand) {
return handleLogicalOperation(op, leftOperand, rightOperand);
@@ -72,28 +72,33 @@ public class JdbcFilterBuilder implements
FilterParser.FilterBuilder {
}
@Override
- @SuppressWarnings("unchecked")
public Object build(FilterParser.Operation opId, Object leftOperand,
Object rightOperand) throws Exception {
// Assume column is on the left
- return handleSimpleOperations(opId,
- (FilterParser.ColumnIndex) leftOperand,
- (FilterParser.Constant) rightOperand);
+ return handleSimpleOperations(
+ opId,
+ (FilterParser.ColumnIndex) leftOperand,
+ (FilterParser.Constant) rightOperand
+ );
}
@Override
- public Object build(FilterParser.Operation operation, Object operand)
throws Exception {
- if (operation == FilterParser.Operation.HDOP_IS_NULL || operation ==
FilterParser.Operation.HDOP_IS_NOT_NULL) {
- // use null for the constant value of null comparison
+ public Object build(FilterParser.Operation operation, Object operand)
throws UnsupportedOperationException {
+ if (
+ operation == FilterParser.Operation.HDOP_IS_NULL ||
+ operation == FilterParser.Operation.HDOP_IS_NOT_NULL
+ ) {
+ // Use null for the constant value of null comparison
return handleSimpleOperations(operation,
(FilterParser.ColumnIndex) operand, null);
- } else {
- throw new Exception("Unsupported unary operation " + operation);
+ }
+ else {
+ throw new UnsupportedOperationException("Unsupported unary
operation '" + operation + "'");
}
}
/*
- * Handles simple column-operator-constant expressions Creates a special
- * filter in the case the column is the row key column
+ * Handles simple column-operator-constant expressions.
+ * Creates a special filter in the case the column is the row key column
*/
private BasicFilter handleSimpleOperations(FilterParser.Operation opId,
FilterParser.ColumnIndex column,
@@ -102,8 +107,7 @@ public class JdbcFilterBuilder implements
FilterParser.FilterBuilder {
}
/**
- * Handles AND of already calculated expressions. Currently only AND, in
the
- * future OR can be added
+ * Handles AND of already calculated expressions.
*
* Four cases here:
* <ol>
@@ -135,7 +139,6 @@ public class JdbcFilterBuilder implements
FilterParser.FilterBuilder {
}
private Object handleLogicalOperation(FilterParser.LogicalOperation
operator, Object leftOperand, Object rightOperand) {
-
List<Object> result = new LinkedList<>();
result.add(leftOperand);
http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenter.java
----------------------------------------------------------------------
diff --git
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenter.java
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenter.java
index 914b7d9..4971269 100644
---
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenter.java
+++
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenter.java
@@ -27,296 +27,311 @@ import org.apache.hawq.pxf.api.utilities.InputData;
import org.apache.hawq.pxf.plugins.jdbc.utils.ByteUtil;
import org.apache.hawq.pxf.plugins.jdbc.utils.DbProduct;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.List;
/**
- * Fragmenter class for JDBC data resources.
- *
- * Extends the {@link Fragmenter} abstract class, with the purpose of
transforming
- * an input data path (an JDBC Database table name and user request
parameters) into a list of regions
- * that belong to this table.
- * <br>
- * The parameter Patterns<br>
- * There are three parameters, the format is as follows:<br>
- * <pre>
- *
<code>PARTITION_BY=column_name:column_type&RANGE=start_value[:end_value]&INTERVAL=interval_num[:interval_unit]</code>
- * </pre>
- * The <code>PARTITION_BY</code> parameter can be split by colon(':'),the
<code>column_type</code> current supported : <code>date,int,enum</code> .
- * The Date format is 'yyyy-MM-dd'. <br>
- * The <code>RANGE</code> parameter can be split by colon(':') ,used to
identify the starting range of each fragment.
- * The range is left-closed, ie:<code> '>= start_value AND < end_value'
</code>.If the <code>column_type</code> is <code>int</code>,
- * the <code>end_value</code> can be empty. If the <code>column_type</code>is
<code>enum</code>,the parameter <code>RANGE</code> can be empty. <br>
- * The <code>INTERVAL</code> parameter can be split by colon(':'), indicate
the interval value of one fragment.
- * When <code>column_type</code> is <code>date</code>,this parameter must be
split by colon, and <code>interval_unit</code> can be
<code>year,month,day</code>.
- * When <code>column_type</code> is <code>int</code>, the
<code>interval_unit</code> can be empty.
- * When <code>column_type</code> is <code>enum</code>,the
<code>INTERVAL</code> parameter can be empty.
- * <br>
- * <p>
- * The syntax examples is :<br>
- *
<code>PARTITION_BY=createdate:date&RANGE=2008-01-01:2010-01-01&INTERVAL=1:month'</code>
<br>
- * <code>PARTITION_BY=year:int&RANGE=2008:2010&INTERVAL=1</code> <br>
- * <code>PARTITION_BY=grade:enum&RANGE=excellent:good:general:bad</code>
- * </p>
+ * JDBC fragmenter
*
+ * Splits the query to allow multiple simultaneous SELECTs
*/
public class JdbcPartitionFragmenter extends Fragmenter {
- String[] partitionBy = null;
- String[] range = null;
- String[] interval = null;
- PartitionType partitionType = null;
- String partitionColumn = null;
- IntervalType intervalType = null;
- int intervalNum = 1;
+ /**
+ * Insert fragment constraints into the SQL query.
+ *
+ * @param inputData InputData of the fragment
+ * @param dbName Database name (affects the behaviour for DATE partitions)
+ * @param query SQL query to insert constraints to. The query may may
contain other WHERE statements
+ */
+ public static void buildFragmenterSql(InputData inputData, String dbName,
StringBuilder query) {
+ if (inputData.getUserProperty("PARTITION_BY") == null) {
+ return;
+ }
- //when partitionType is DATE,it is valid
- Calendar rangeStart = null;
- Calendar rangeEnd = null;
+ byte[] meta = inputData.getFragmentMetadata();
+ if (meta == null) {
+ return;
+ }
+ String[] partitionBy =
inputData.getUserProperty("PARTITION_BY").split(":");
+ String partitionColumn = partitionBy[0];
+ PartitionType partitionType = PartitionType.typeOf(partitionBy[1]);
+ DbProduct dbProduct = DbProduct.getDbProduct(dbName);
+ if (!query.toString().contains("WHERE")) {
+ query.append(" WHERE ");
+ }
+ else {
+ query.append(" AND ");
+ }
- enum PartitionType {
- DATE,
- INT,
- ENUM;
+ switch (partitionType) {
+ case DATE: {
+ byte[][] newb = ByteUtil.splitBytes(meta);
+ Date fragStart = new Date(ByteUtil.toLong(newb[0]));
+ Date fragEnd = new Date(ByteUtil.toLong(newb[1]));
- public static PartitionType getType(String str) {
- return valueOf(str.toUpperCase());
- }
- }
+ SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
+ query.append(partitionColumn).append(" >=
").append(dbProduct.wrapDate(df.format(fragStart)));
+ query.append(" AND ");
+ query.append(partitionColumn).append(" <
").append(dbProduct.wrapDate(df.format(fragEnd)));
- enum IntervalType {
- DAY,
- MONTH,
- YEAR;
+ break;
+ }
+ case INT: {
+ byte[][] newb = ByteUtil.splitBytes(meta);
+ long fragStart = ByteUtil.toLong(newb[0]);
+ long fragEnd = ByteUtil.toLong(newb[1]);
- public static IntervalType type(String str) {
- return valueOf(str.toUpperCase());
+ query.append(partitionColumn).append(" >= ").append(fragStart);
+ query.append(" AND ");
+ query.append(partitionColumn).append(" < ").append(fragEnd);
+ break;
+ }
+ case ENUM: {
+ query.append(partitionColumn).append(" = '").append(new
String(meta)).append("'");
+ break;
+ }
}
}
/**
- * Constructor for JdbcPartitionFragmenter.
+ * Class constructor.
*
- * @param inConf input data such as which Jdbc table to scan
- * @throws UserDataException if the request parameter is malformed
+ * @param inputData PXF InputData
+ * @throws UserDataException if the request parameter is malformed
*/
- public JdbcPartitionFragmenter(InputData inConf) throws UserDataException {
- super(inConf);
- if (inConf.getUserProperty("PARTITION_BY") == null)
+ public JdbcPartitionFragmenter(InputData inputData) throws
UserDataException {
+ super(inputData);
+ if (inputData.getUserProperty("PARTITION_BY") == null) {
return;
+ }
+
+ // PARTITION_BY
try {
- partitionBy = inConf.getUserProperty("PARTITION_BY").split(":");
- partitionColumn = partitionBy[0];
- partitionType = PartitionType.getType(partitionBy[1]);
- } catch (IllegalArgumentException | ArrayIndexOutOfBoundsException e1)
{
- throw new UserDataException("The parameter 'PARTITION_BY' invalid,
the pattern is 'column_name:date|int|enum'");
+ partitionType = PartitionType.typeOf(
+ inputData.getUserProperty("PARTITION_BY").split(":")[1]
+ );
+ }
+ catch (IllegalArgumentException | ArrayIndexOutOfBoundsException ex) {
+ throw new UserDataException("The parameter 'PARTITION_BY' is
invalid. The pattern is '<column_name>:date|int|enum'");
}
- //parse and validate parameter-RANGE
+ // RANGE
try {
- String rangeStr = inConf.getUserProperty("RANGE");
+ String rangeStr = inputData.getUserProperty("RANGE");
if (rangeStr != null) {
range = rangeStr.split(":");
- if (range.length == 1 && partitionType != PartitionType.ENUM)
- throw new UserDataException("The parameter 'RANGE' does
not specify '[:end_value]'");
- } else
+ if (range.length == 1 && partitionType != PartitionType.ENUM) {
+ throw new UserDataException("The parameter 'RANGE' must
specify ':<end_value>' for this PARTITION_TYPE");
+ }
+ }
+ else {
throw new UserDataException("The parameter 'RANGE' must be
specified along with 'PARTITION_BY'");
- } catch (IllegalArgumentException e1) {
- throw new UserDataException("The parameter 'RANGE' invalid, the
pattern is 'start_value[:end_value]'");
+ }
+
+ if (partitionType == PartitionType.DATE) {
+ // Parse DATE partition type values
+ try {
+ SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
+ rangeDateStart = Calendar.getInstance();
+ rangeDateStart.setTime(df.parse(range[0]));
+ rangeDateEnd = Calendar.getInstance();
+ rangeDateEnd.setTime(df.parse(range[1]));
+ }
+ catch (ParseException e) {
+ throw new UserDataException("The parameter 'RANGE' has
invalid date format. The correct format is 'yyyy-MM-dd'");
+ }
+ }
+ else if (partitionType == PartitionType.INT) {
+ // Parse INT partition type values
+ try {
+ rangeIntStart = Long.parseLong(range[0]);
+ rangeIntEnd = Long.parseLong(range[1]);
+ }
+ catch (NumberFormatException e) {
+ throw new UserDataException("The parameter 'RANGE' is
invalid. Both range boundaries must be integers");
+ }
+ }
+ }
+ catch (IllegalArgumentException ex) {
+ throw new UserDataException("The parameter 'RANGE' is invalid. The
pattern is '<start_value>[:<end_value>]'");
}
- //parse and validate parameter-INTERVAL
+ // INTERVAL
try {
- String intervalStr = inConf.getUserProperty("INTERVAL");
+ String intervalStr = inputData.getUserProperty("INTERVAL");
if (intervalStr != null) {
- interval = intervalStr.split(":");
- intervalNum = Integer.parseInt(interval[0]);
- if (interval.length > 1)
- intervalType = IntervalType.type(interval[1]);
- if (interval.length == 1 && partitionType ==
PartitionType.DATE)
- throw new UserDataException("The parameter 'INTERVAL' does
not specify unit [:year|month|day]");
- } else if (partitionType != PartitionType.ENUM)
- throw new UserDataException("The parameter 'INTERVAL' must be
specified along with 'PARTITION_BY'");
- if (intervalNum < 1)
- throw new UserDataException("The parameter 'INTERVAL' must >
1, but actual is '" + intervalNum + "'");
- } catch (IllegalArgumentException e1) {
- throw new UserDataException("The parameter 'INTERVAL' invalid, the
pattern is 'interval_num[:interval_unit]'");
- }
+ String[] interval = intervalStr.split(":");
+ try {
+ intervalNum = Long.parseLong(interval[0]);
+ if (intervalNum < 1) {
+ throw new UserDataException("The '<interval_num>' in
parameter 'INTERVAL' must be at least 1, but actual is " + intervalNum);
+ }
+ }
+ catch (NumberFormatException ex) {
+ throw new UserDataException("The '<interval_num>' in
parameter 'INTERVAL' must be an integer");
+ }
- //parse any date values
- try {
- if (partitionType == PartitionType.DATE) {
- SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
- rangeStart = Calendar.getInstance();
- rangeStart.setTime(df.parse(range[0]));
- rangeEnd = Calendar.getInstance();
- rangeEnd.setTime(df.parse(range[1]));
+ // Intervals of type DATE
+ if (interval.length > 1) {
+ intervalType = IntervalType.typeOf(interval[1]);
+ }
+ if (interval.length == 1 && partitionType ==
PartitionType.DATE) {
+ throw new UserDataException("The parameter 'INTERVAL' must
specify unit (':year|month|day') for the PARTITION_TYPE = 'DATE'");
+ }
+ }
+ else if (partitionType != PartitionType.ENUM) {
+ throw new UserDataException("The parameter 'INTERVAL' must be
specified along with 'PARTITION_BY' for this PARTITION_TYPE");
}
- } catch (ParseException e) {
- throw new UserDataException("The parameter 'RANGE' has invalid
date format. Expected format is 'YYYY-MM-DD'");
+ }
+ catch (IllegalArgumentException ex) {
+ throw new UserDataException("The parameter 'INTERVAL' is invalid.
The pattern is '<interval_num>[:<interval_unit>]'");
}
}
/**
- * Returns statistics for Jdbc table. Currently it's not implemented.
* @throws UnsupportedOperationException ANALYZE for Jdbc plugin is not
supported
*/
@Override
public FragmentsStats getFragmentsStats() throws
UnsupportedOperationException {
- throw new UnsupportedOperationException("ANALYZE for Jdbc plugin is
not supported");
+ throw new UnsupportedOperationException("ANALYZE for JDBC plugin is
not supported");
}
/**
- * Returns list of fragments containing all of the
- * Jdbc table data.
+ * getFragments() implementation
*
- * @return a list of fragments
- * @throws Exception if assign host error
+ * @return a list of fragments to be passed to PXF segments
*/
@Override
- public List<Fragment> getFragments() throws Exception {
+ public List<Fragment> getFragments() {
if (partitionType == null) {
- byte[] fragmentMetadata = null;
- byte[] userData = null;
- Fragment fragment = new Fragment(inputData.getDataSource(), null,
fragmentMetadata, userData);
+ // No partition case
+ Fragment fragment = new Fragment(inputData.getDataSource(),
pxfHosts, null);
fragments.add(fragment);
- return prepareHosts(fragments);
+ return fragments;
}
+
switch (partitionType) {
case DATE: {
- int currInterval = intervalNum;
+ Calendar fragStart = rangeDateStart;
- Calendar fragStart = rangeStart;
- while (fragStart.before(rangeEnd)) {
- Calendar fragEnd = (Calendar) fragStart.clone();
+ while (fragStart.before(rangeDateEnd)) {
+ // Calculate a new fragment
+ Calendar fragEnd = (Calendar)fragStart.clone();
switch (intervalType) {
case DAY:
- fragEnd.add(Calendar.DAY_OF_MONTH, currInterval);
+ fragEnd.add(Calendar.DAY_OF_MONTH,
(int)intervalNum);
break;
case MONTH:
- fragEnd.add(Calendar.MONTH, currInterval);
+ fragEnd.add(Calendar.MONTH, (int)intervalNum);
break;
case YEAR:
- fragEnd.add(Calendar.YEAR, currInterval);
+ fragEnd.add(Calendar.YEAR, (int)intervalNum);
break;
}
- if (fragEnd.after(rangeEnd))
- fragEnd = (Calendar) rangeEnd.clone();
+ if (fragEnd.after(rangeDateEnd))
+ fragEnd = (Calendar)rangeDateEnd.clone();
- //make metadata of this fragment , converts the date to a
millisecond,then get bytes.
+ // Convert to byte[]
byte[] msStart =
ByteUtil.getBytes(fragStart.getTimeInMillis());
byte[] msEnd =
ByteUtil.getBytes(fragEnd.getTimeInMillis());
byte[] fragmentMetadata = ByteUtil.mergeBytes(msStart,
msEnd);
- byte[] userData = new byte[0];
- Fragment fragment = new
Fragment(inputData.getDataSource(), null, fragmentMetadata, userData);
+ // Write fragment
+ Fragment fragment = new
Fragment(inputData.getDataSource(), pxfHosts, fragmentMetadata);
fragments.add(fragment);
- //continue next fragment.
+ // Prepare for the next fragment
fragStart = fragEnd;
}
break;
}
case INT: {
- int rangeStart = Integer.parseInt(range[0]);
- int rangeEnd = Integer.parseInt(range[1]);
- int currInterval = intervalNum;
+ long fragStart = rangeIntStart;
- //validate : curr_interval > 0
- int fragStart = rangeStart;
- while (fragStart < rangeEnd) {
- int fragEnd = fragStart + currInterval;
- if (fragEnd > rangeEnd) fragEnd = rangeEnd;
+ while (fragStart < rangeIntEnd) {
+ // Calculate a new fragment
+ long fragEnd = fragStart + intervalNum;
+ if (fragEnd > rangeIntEnd) {
+ fragEnd = rangeIntEnd;
+ }
+ // Convert to byte[]
byte[] bStart = ByteUtil.getBytes(fragStart);
byte[] bEnd = ByteUtil.getBytes(fragEnd);
byte[] fragmentMetadata = ByteUtil.mergeBytes(bStart,
bEnd);
- byte[] userData = new byte[0];
- Fragment fragment = new
Fragment(inputData.getDataSource(), null, fragmentMetadata, userData);
+ // Write fragment
+ Fragment fragment = new
Fragment(inputData.getDataSource(), pxfHosts, fragmentMetadata);
fragments.add(fragment);
- //continue next fragment.
- fragStart = fragEnd;// + 1;
+ // Prepare for the next fragment
+ fragStart = fragEnd;
}
break;
}
- case ENUM:
+ case ENUM: {
for (String frag : range) {
byte[] fragmentMetadata = frag.getBytes();
- Fragment fragment = new
Fragment(inputData.getDataSource(), null, fragmentMetadata, new byte[0]);
+ Fragment fragment = new
Fragment(inputData.getDataSource(), pxfHosts, fragmentMetadata);
fragments.add(fragment);
}
break;
+ }
}
- return prepareHosts(fragments);
+ return fragments;
}
- /**
- * For each fragment , assigned a host address.
- * In Jdbc Plugin, 'replicas' is the host address of the PXF engine that
is running, not the database engine.
- * Since the other PXF host addresses can not be probed, only the host
name of the current PXF engine is returned.
- * @param fragments a list of fragments
- * @return a list of fragments that assigned hosts.
- * @throws UnknownHostException if InetAddress.getLocalHost error.
- */
- public static List<Fragment> prepareHosts(List<Fragment> fragments) throws
UnknownHostException {
- for (Fragment fragment : fragments) {
- String pxfHost = InetAddress.getLocalHost().getHostAddress();
- String[] hosts = new String[]{pxfHost};
- fragment.setReplicas(hosts);
- }
+ // Partition parameters (filled by class constructor)
+ private String[] range = null;
+ private PartitionType partitionType = null;
+ private long intervalNum;
- return fragments;
- }
+ // Partition parameters for INT partitions (filled by class constructor)
+ private long rangeIntStart;
+ private long rangeIntEnd;
- public String buildFragmenterSql(String dbName, String originSql) {
- byte[] meta = inputData.getFragmentMetadata();
- if (meta == null)
- return originSql;
+ // Partition parameters for DATE partitions (filled by class constructor)
+ private IntervalType intervalType;
+ private Calendar rangeDateStart;
+ private Calendar rangeDateEnd;
- DbProduct dbProduct = DbProduct.getDbProduct(dbName);
+ private static enum PartitionType {
+ DATE,
+ INT,
+ ENUM;
- StringBuilder sb = new StringBuilder(originSql);
- if (!originSql.contains("WHERE"))
- sb.append(" WHERE 1=1 ");
+ public static PartitionType typeOf(String str) {
+ return valueOf(str.toUpperCase());
+ }
+ }
- sb.append(" AND ");
- switch (partitionType) {
- case DATE: {
- SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
- //parse metadata of this fragment
- //validate: the length of metadata == 16 (long)
- byte[][] newb = ByteUtil.splitBytes(meta, 8);
- Date fragStart = new Date(ByteUtil.toLong(newb[0]));
- Date fragEnd = new Date(ByteUtil.toLong(newb[1]));
+ private static enum IntervalType {
+ DAY,
+ MONTH,
+ YEAR;
- sb.append(partitionColumn).append(" >=
").append(dbProduct.wrapDate(df.format(fragStart)));
- sb.append(" AND ");
- sb.append(partitionColumn).append(" <
").append(dbProduct.wrapDate(df.format(fragEnd)));
+ public static IntervalType typeOf(String str) {
+ return valueOf(str.toUpperCase());
+ }
+ }
- break;
- }
- case INT: {
- //validate: the length of metadata == 8 (int)
- byte[][] newb = ByteUtil.splitBytes(meta, 4);
- int fragStart = ByteUtil.toInt(newb[0]);
- int fragEnd = ByteUtil.toInt(newb[1]);
- sb.append(partitionColumn).append(" >= ").append(fragStart);
- sb.append(" AND ");
- sb.append(partitionColumn).append(" < ").append(fragEnd);
- break;
- }
- case ENUM:
- sb.append(partitionColumn).append("='").append(new
String(meta)).append("'");
- break;
+ // A PXF engine to use as a host for fragments
+ private static final String[] pxfHosts;
+ static {
+ String[] localhost = {"localhost"};
+ try {
+ localhost[0] = InetAddress.getLocalHost().getHostAddress();
+ }
+ catch (UnknownHostException ex) {
+ // It is always possible to get 'localhost' address
}
- return sb.toString();
+ pxfHosts = localhost;
}
}
http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPlugin.java
----------------------------------------------------------------------
diff --git
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPlugin.java
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPlugin.java
index c0af405..6715508 100644
---
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPlugin.java
+++
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPlugin.java
@@ -19,95 +19,209 @@ package org.apache.hawq.pxf.plugins.jdbc;
* under the License.
*/
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hawq.pxf.api.UserDataException;
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
import org.apache.hawq.pxf.api.utilities.InputData;
import org.apache.hawq.pxf.api.utilities.Plugin;
-import java.sql.*;
+import java.util.ArrayList;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.SQLTimeoutException;
+import java.sql.Statement;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
- * This class resolves the jdbc connection parameter and manages the opening
and closing of the jdbc connection.
- * Implemented subclasses: {@link JdbcReadAccessor}.
+ * JDBC tables plugin (base class)
*
+ * Implemented subclasses: {@link JdbcAccessor}, {@link JdbcResolver}.
*/
public class JdbcPlugin extends Plugin {
- private static final Log LOG = LogFactory.getLog(JdbcPlugin.class);
-
- //jdbc connection parameters
- protected String jdbcDriver = null;
- protected String dbUrl = null;
- protected String user = null;
- protected String pass = null;
- protected String tblName = null;
- protected int batchSize = 100;
-
- //jdbc connection
- protected Connection dbConn = null;
- //database type, from DatabaseMetaData.getDatabaseProductName()
- protected String dbProduct = null;
-
/**
- * parse input data
+ * Class constructor
+ *
+ * @param input {@link InputData} provided by PXF
*
- * @param input the input data
- * @throws UserDataException if the request parameter is malformed
+ * @throws UserDataException if one of the required request parameters is
not set
*/
public JdbcPlugin(InputData input) throws UserDataException {
super(input);
+
jdbcDriver = input.getUserProperty("JDBC_DRIVER");
+ if (jdbcDriver == null) {
+ throw new UserDataException("JDBC_DRIVER is a required parameter");
+ }
+
dbUrl = input.getUserProperty("DB_URL");
- user = input.getUserProperty("USER");
- pass = input.getUserProperty("PASS");
- String strBatch = input.getUserProperty("BATCH_SIZE");
- if (strBatch != null) {
- batchSize = Integer.parseInt(strBatch);
+ if (dbUrl == null) {
+ throw new UserDataException("DB_URL is a required parameter");
}
- if (jdbcDriver == null) {
- throw new UserDataException("JDBC_DRIVER must be set");
+ tableName = input.getDataSource();
+ if (tableName == null) {
+ throw new UserDataException("Data source must be provided");
}
- if (dbUrl == null) {
- throw new UserDataException("DB_URL must be set(read)");
+ /*
+ At the moment, when writing into some table, the table name is
+ concatenated with a special string that is necessary to write into
HDFS.
+ However, a raw table name is necessary in case of JDBC.
+ The correct table name is extracted here.
+ */
+ Matcher matcher = tableNamePattern.matcher(tableName);
+ if (matcher.matches()) {
+ inputData.setDataSource(matcher.group(1));
+ tableName = input.getDataSource();
}
- tblName = input.getDataSource();
- if (tblName == null) {
- throw new UserDataException("TABLE_NAME must be set as
DataSource.");
+ columns = inputData.getTupleDescription();
+ if (columns == null) {
+ throw new UserDataException("Tuple description must be provided");
+ }
+
+ // This parameter is not required. The default value is null
+ user = input.getUserProperty("USER");
+ if (user != null) {
+ pass = input.getUserProperty("PASS");
+ }
+
+ // This parameter is not required. The default value is 0
+ String batchSizeRaw = input.getUserProperty("BATCH_SIZE");
+ if (batchSizeRaw != null) {
+ try {
+ batchSize = Integer.parseInt(batchSizeRaw);
+ if (batchSize < 1) {
+ throw new NumberFormatException();
+ } else if (batchSize == 0) {
+ batchSize = 1;
+ }
+ batchSizeIsSetByUser = true;
+ }
+ catch (NumberFormatException e) {
+ throw new UserDataException("BATCH_SIZE is incorrect: must be
a non-negative integer");
+ }
}
- }
- public String getTableName() {
- return tblName;
+ // This parameter is not required. The default value is 1
+ String poolSizeRaw = input.getUserProperty("POOL_SIZE");
+ if (poolSizeRaw != null) {
+ try {
+ poolSize = Integer.parseInt(poolSizeRaw);
+ }
+ catch (NumberFormatException e) {
+ throw new UserDataException("POOL_SIZE is incorrect: must be
an integer");
+ }
+ }
}
- protected Connection openConnection() throws ClassNotFoundException,
SQLException {
+ /**
+ * Open a new JDBC connection
+ *
+ * @throws ClassNotFoundException if the JDBC driver was not found
+ * @throws SQLException if a database access error occurs
+ * @throws SQLTimeoutException if a connection problem occurs
+ */
+ public Connection getConnection() throws ClassNotFoundException,
SQLException, SQLTimeoutException {
+ Connection connection;
if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Open JDBC:
driver=%s,url=%s,user=%s,pass=%s,table=%s",
- jdbcDriver, dbUrl, user, pass, tblName));
- }
- if (dbConn == null || dbConn.isClosed()) {
- Class.forName(jdbcDriver);
if (user != null) {
- dbConn = DriverManager.getConnection(dbUrl, user, pass);
- } else {
- dbConn = DriverManager.getConnection(dbUrl);
+ LOG.debug(String.format("Open JDBC connection: driver=%s,
url=%s, user=%s, pass=%s, table=%s",
+ jdbcDriver, dbUrl, user, pass, tableName));
+ }
+ else {
+ LOG.debug(String.format("Open JDBC connection: driver=%s,
url=%s, table=%s",
+ jdbcDriver, dbUrl, tableName));
}
- DatabaseMetaData meta = dbConn.getMetaData();
- dbProduct = meta.getDatabaseProductName();
}
- return dbConn;
+ Class.forName(jdbcDriver);
+ if (user != null) {
+ connection = DriverManager.getConnection(dbUrl, user, pass);
+ }
+ else {
+ connection = DriverManager.getConnection(dbUrl);
+ }
+ return connection;
}
- protected void closeConnection() {
+ /**
+ * Close a JDBC connection
+ */
+ public static void closeConnection(Connection connection) {
try {
- if (dbConn != null) {
- dbConn.close();
- dbConn = null;
+ if ((connection != null) && (!connection.isClosed())) {
+ if ((connection.getMetaData().supportsTransactions()) &&
(!connection.getAutoCommit())) {
+ connection.commit();
+ }
+ connection.close();
}
- } catch (SQLException e) {
- LOG.error("Close db connection error . ", e);
}
+ catch (SQLException e) {
+ LOG.error("JDBC connection close error", e);
+ }
+ }
+
+ /**
+ * Prepare a JDBC PreparedStatement
+ *
+ * @throws ClassNotFoundException if the JDBC driver was not found
+ * @throws SQLException if a database access error occurs
+ * @throws SQLTimeoutException if a connection problem occurs
+ */
+ public PreparedStatement getPreparedStatement(Connection connection,
String query) throws SQLException, SQLTimeoutException, ClassNotFoundException {
+ if ((connection == null) || (query == null)) {
+ throw new IllegalArgumentException("The provided query or
connection is null");
+ }
+ if (connection.getMetaData().supportsTransactions()) {
+ connection.setAutoCommit(false);
+ }
+ return connection.prepareStatement(query);
}
+
+ /**
+ * Close a JDBC Statement (and the connection it is based on)
+ */
+ public static void closeStatement(Statement statement) {
+ if (statement == null) {
+ return;
+ }
+ Connection connection = null;
+ try {
+ if (!statement.isClosed()) {
+ connection = statement.getConnection();
+ statement.close();
+ }
+ }
+ catch (Exception e) {}
+ closeConnection(connection);
+ }
+
+ // JDBC parameters
+ protected String jdbcDriver = null;
+ protected String dbUrl = null;
+ protected String user = null;
+ protected String pass = null;
+
+ protected String tableName = null;
+
+ // '100' is a recommended value:
https://docs.oracle.com/cd/E11882_01/java.112/e16548/oraperf.htm#JJDBC28754
+ public static final int DEFAULT_BATCH_SIZE = 100;
+ // After argument parsing, this value is guaranteed to be >= 1
+ protected int batchSize = DEFAULT_BATCH_SIZE;
+ protected boolean batchSizeIsSetByUser = false;
+
+ protected int poolSize = 1;
+
+ // Columns description
+ protected ArrayList<ColumnDescriptor> columns = null;
+
+
+ private static final Log LOG = LogFactory.getLog(JdbcPlugin.class);
+
+ // At the moment, when writing into some table, the table name is
concatenated with a special string that is necessary to write into HDFS.
However, a raw table name is necessary in case of JDBC. This Pattern allows to
extract the correct table name from the given InputData.dataSource
+ private static final Pattern tableNamePattern =
Pattern.compile("/(.*)/[0-9]*-[0-9]*_[0-9]*");
}
http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadAccessor.java
----------------------------------------------------------------------
diff --git
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadAccessor.java
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadAccessor.java
deleted file mode 100644
index 2ca9a94..0000000
---
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadAccessor.java
+++ /dev/null
@@ -1,122 +0,0 @@
-package org.apache.hawq.pxf.plugins.jdbc;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.hawq.pxf.api.OneRow;
-import org.apache.hawq.pxf.api.ReadAccessor;
-import org.apache.hawq.pxf.api.UserDataException;
-import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
-import org.apache.hawq.pxf.api.utilities.InputData;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.sql.*;
-import java.util.ArrayList;
-
-/**
- * Accessor for Jdbc tables. The accessor will open and read a partition
belonging
- * to a Jdbc table. JdbcReadAccessor generates and executes SQL from filter and
- * fragmented information, uses {@link JdbcReadResolver } to read the
ResultSet, and generates
- * the data type - List {@link OneRow} that HAWQ needs.
- */
-public class JdbcReadAccessor extends JdbcPlugin implements ReadAccessor {
- private static final Log LOG = LogFactory.getLog(JdbcReadAccessor.class);
-
- WhereSQLBuilder filterBuilder = null;
- private ColumnDescriptor keyColumn = null;
-
- private String querySql = null;
- private Statement statement = null;
- private ResultSet resultSet = null;
-
- public JdbcReadAccessor(InputData input) throws UserDataException {
- super(input);
- filterBuilder = new WhereSQLBuilder(inputData);
-
- //buid select statement (not contain where statement)
- ArrayList<ColumnDescriptor> columns = input.getTupleDescription();
- StringBuilder sb = new StringBuilder();
- sb.append("SELECT ");
- for (int i = 0; i < columns.size(); i++) {
- ColumnDescriptor column = columns.get(i);
- if (column.isKeyColumn())
- keyColumn = column;
- if (i > 0) sb.append(",");
- sb.append(column.columnName());
- }
- sb.append(" FROM ").append(getTableName());
- querySql = sb.toString();
- }
-
- /**
- * open db connection, execute query sql
- */
- @Override
- public boolean openForRead() throws Exception {
- if (statement != null && !statement.isClosed())
- return true;
- super.openConnection();
-
- statement = dbConn.createStatement();
-
- resultSet = executeQuery(querySql);
-
- return true;
- }
-
- public ResultSet executeQuery(String sql) throws Exception {
- String query = sql;
- if (inputData.hasFilter()) {
- //parse filter string , build where statement
- String whereSql = filterBuilder.buildWhereSQL(dbProduct);
-
- if (whereSql != null) {
- query = query + " WHERE " + whereSql;
- }
- }
-
- //according to the fragment information, rewriting sql
- JdbcPartitionFragmenter fragmenter = new
JdbcPartitionFragmenter(inputData);
- query = fragmenter.buildFragmenterSql(dbProduct, query);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("executeQuery: " + query);
- }
-
- return statement.executeQuery(query);
- }
-
- @Override
- public OneRow readNextObject() throws Exception {
- if (resultSet.next()) {
- return new OneRow(null, resultSet);
- }
- return null;
- }
-
- @Override
- public void closeForRead() throws Exception {
- if (statement != null && !statement.isClosed()) {
- statement.close();
- statement = null;
- }
- super.closeConnection();
- }
-}
\ No newline at end of file