Repository: hawq
Updated Branches:
  refs/heads/master a741655ba -> 472fa2b74


http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadResolver.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadResolver.java
 
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadResolver.java
deleted file mode 100644
index 1c61537..0000000
--- 
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadResolver.java
+++ /dev/null
@@ -1,103 +0,0 @@
-package org.apache.hawq.pxf.plugins.jdbc;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hawq.pxf.api.*;
-import org.apache.hawq.pxf.api.io.DataType;
-import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
-import org.apache.hawq.pxf.api.utilities.InputData;
-import org.apache.hawq.pxf.api.utilities.Plugin;
-
-import java.sql.ResultSet;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Class JdbcReadResolver Read the Jdbc ResultSet, and generates the data type 
- List {@link OneField}.
- */
-public class JdbcReadResolver extends Plugin implements ReadResolver {
-    private static final Log LOG = LogFactory.getLog(JdbcReadResolver.class);
-    //HAWQ Table column definitions
-    private ArrayList<ColumnDescriptor> columns = null;
-
-    public JdbcReadResolver(InputData input) {
-        super(input);
-        columns = input.getTupleDescription();
-    }
-
-    @Override
-    public List<OneField> getFields(OneRow row) throws Exception {
-        ResultSet result = (ResultSet) row.getData();
-        LinkedList<OneField> fields = new LinkedList<>();
-
-        for (int i = 0; i < columns.size(); i++) {
-            ColumnDescriptor column = columns.get(i);
-            String colName = column.columnName();
-            Object value = null;
-
-            OneField oneField = new OneField();
-            oneField.type = column.columnTypeCode();
-
-            switch (DataType.get(oneField.type)) {
-                case INTEGER:
-                    value = result.getInt(colName);
-                    break;
-                case FLOAT8:
-                    value = result.getDouble(colName);
-                    break;
-                case REAL:
-                    value = result.getFloat(colName);
-                    break;
-                case BIGINT:
-                    value = result.getLong(colName);
-                    break;
-                case SMALLINT:
-                    value = result.getShort(colName);
-                    break;
-                case BOOLEAN:
-                    value = result.getBoolean(colName);
-                    break;
-                case BYTEA:
-                    value = result.getBytes(colName);
-                    break;
-                case VARCHAR:
-                case BPCHAR:
-                case TEXT:
-                case NUMERIC:
-                    value = result.getString(colName);
-                    break;
-                case TIMESTAMP:
-                case DATE:
-                    value = result.getDate(colName);
-                    break;
-                default:
-                    throw new UnsupportedOperationException("Unknwon Field 
Type : " + DataType.get(oneField.type).toString()
-                            + ", Column : " + column.toString());
-            }
-            oneField.val = value;
-            fields.add(oneField);
-        }
-        return fields;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcResolver.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcResolver.java 
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcResolver.java
new file mode 100644
index 0000000..ab88326
--- /dev/null
+++ 
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcResolver.java
@@ -0,0 +1,367 @@
+package org.apache.hawq.pxf.plugins.jdbc;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadResolver;
+import org.apache.hawq.pxf.api.UserDataException;
+import org.apache.hawq.pxf.api.WriteResolver;
+
+import java.util.List;
+import java.util.LinkedList;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * JDBC tables resolver
+ */
+public class JdbcResolver extends JdbcPlugin implements ReadResolver, 
WriteResolver {
+    /**
+     * Class constructor
+     */
+    public JdbcResolver(InputData input) throws UserDataException {
+        super(input);
+    }
+
+    /**
+     * getFields() implementation
+     *
+     * @throws SQLException if the provided {@link OneRow} object is invalid
+     */
+    @Override
+    public List<OneField> getFields(OneRow row) throws SQLException {
+        ResultSet result = (ResultSet) row.getData();
+        LinkedList<OneField> fields = new LinkedList<>();
+
+        for (ColumnDescriptor column : columns) {
+            String colName = column.columnName();
+            Object value = null;
+
+            OneField oneField = new OneField();
+            oneField.type = column.columnTypeCode();
+
+            switch (DataType.get(oneField.type)) {
+                case INTEGER:
+                    value = result.getInt(colName);
+                    break;
+                case FLOAT8:
+                    value = result.getDouble(colName);
+                    break;
+                case REAL:
+                    value = result.getFloat(colName);
+                    break;
+                case BIGINT:
+                    value = result.getLong(colName);
+                    break;
+                case SMALLINT:
+                    value = result.getShort(colName);
+                    break;
+                case BOOLEAN:
+                    value = result.getBoolean(colName);
+                    break;
+                case BYTEA:
+                    value = result.getBytes(colName);
+                    break;
+                case VARCHAR:
+                case BPCHAR:
+                case TEXT:
+                case NUMERIC:
+                    value = result.getString(colName);
+                    break;
+                case DATE:
+                    value = result.getDate(colName);
+                    break;
+                case TIMESTAMP:
+                    value = result.getTimestamp(colName);
+                    break;
+                default:
+                    throw new UnsupportedOperationException("Field type '" + 
DataType.get(oneField.type).toString() + "' (column '" + column.toString() + 
"') is not supported");
+            }
+
+            oneField.val = value;
+            fields.add(oneField);
+        }
+        return fields;
+    }
+
+    /**
+     * setFields() implementation
+     *
+     * @return OneRow with the data field containing a List<OneField>
+     * OneFields are not reordered before being passed to Accessor; at the
+     * moment, there is no way to correct the order of the fields if it is not.
+     * In practice, the 'record' provided is always ordered the right way.
+     *
+     * @throws UnsupportedOperationException if field of some type is not 
supported
+     */
+    @Override
+    public OneRow setFields(List<OneField> record) throws 
UnsupportedOperationException, ParseException {
+        int column_index = 0;
+        for (OneField oneField : record) {
+            ColumnDescriptor column = columns.get(column_index);
+            if (
+                LOG.isDebugEnabled() &&
+                DataType.get(column.columnTypeCode()) != 
DataType.get(oneField.type)
+            ) {
+                LOG.warn("The provided tuple of data may be disordered. 
Datatype of column with descriptor '" + column.toString() + "' must be '" + 
DataType.get(column.columnTypeCode()).toString() + "', but actual is '" + 
DataType.get(oneField.type).toString() + "'");
+            }
+
+            // Check that data type is supported
+            switch (DataType.get(oneField.type)) {
+                case BOOLEAN:
+                case INTEGER:
+                case FLOAT8:
+                case REAL:
+                case BIGINT:
+                case SMALLINT:
+                case NUMERIC:
+                case VARCHAR:
+                case BPCHAR:
+                case TEXT:
+                case BYTEA:
+                case TIMESTAMP:
+                case DATE:
+                    break;
+                default:
+                    throw new UnsupportedOperationException("Field type '" + 
DataType.get(oneField.type).toString() + "' (column '" + column.toString() + 
"') is not supported");
+            }
+
+            if (
+                LOG.isDebugEnabled() &&
+                DataType.get(oneField.type) == DataType.BYTEA
+            ) {
+                LOG.debug("OneField content (conversion from BYTEA): '" + new 
String((byte[])oneField.val) + "'");
+            }
+
+            // Convert TEXT columns into native data types
+            if ((DataType.get(oneField.type) == DataType.TEXT) && 
(DataType.get(column.columnTypeCode()) != DataType.TEXT)) {
+                oneField.type = column.columnTypeCode();
+                if (oneField.val != null) {
+                    String rawVal = (String)oneField.val;
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("OneField content (conversion from TEXT): '" 
+ rawVal + "'");
+                    }
+                    switch (DataType.get(column.columnTypeCode())) {
+                        case VARCHAR:
+                        case BPCHAR:
+                        case TEXT:
+                        case BYTEA:
+                            break;
+                        case BOOLEAN:
+                            oneField.val = 
(Object)Boolean.parseBoolean(rawVal);
+                            break;
+                        case INTEGER:
+                            oneField.val = (Object)Integer.parseInt(rawVal);
+                            break;
+                        case FLOAT8:
+                            oneField.val = (Object)Double.parseDouble(rawVal);
+                            break;
+                        case REAL:
+                            oneField.val = (Object)Float.parseFloat(rawVal);
+                            break;
+                        case BIGINT:
+                            oneField.val = (Object)Long.parseLong(rawVal);
+                            break;
+                        case SMALLINT:
+                            oneField.val = (Object)Short.parseShort(rawVal);
+                            break;
+                        case NUMERIC:
+                            oneField.val = (Object)new BigDecimal(rawVal);
+                            break;
+                        case TIMESTAMP:
+                            boolean isConversionSuccessful = false;
+                            for (SimpleDateFormat sdf : timestampSDFs.get()) {
+                                try {
+                                    java.util.Date parsedTimestamp = 
sdf.parse(rawVal);
+                                    oneField.val = (Object)new 
Timestamp(parsedTimestamp.getTime());
+                                    isConversionSuccessful = true;
+                                    break;
+                                }
+                                catch (ParseException e) {
+                                    // pass
+                                }
+                            }
+                            if (!isConversionSuccessful) {
+                                throw new ParseException(rawVal, 0);
+                            }
+                            break;
+                        case DATE:
+                            oneField.val = (Object)new 
Date(dateSDF.get().parse(rawVal).getTime());
+                            break;
+                        default:
+                            throw new UnsupportedOperationException("Field 
type '" + DataType.get(oneField.type).toString() + "' (column '" + 
column.toString() + "') is not supported");
+                    }
+                }
+            }
+
+            column_index += 1;
+        }
+        return new OneRow(new LinkedList<OneField>(record));
+    }
+
+    /**
+     * Decode OneRow object and pass all its contents to a PreparedStatement
+     *
+     * @throws IOException if data in a OneRow is corrupted
+     * @throws SQLException if the given statement is broken
+     */
+    @SuppressWarnings("unchecked")
+    public static void decodeOneRowToPreparedStatement(OneRow row, 
PreparedStatement statement) throws IOException, SQLException {
+        // This is safe: OneRow comes from JdbcResolver
+        List<OneField> tuple = (List<OneField>)row.getData();
+        for (int i = 1; i <= tuple.size(); i++) {
+            OneField field = tuple.get(i - 1);
+            switch (DataType.get(field.type)) {
+                case INTEGER:
+                    if (field.val == null) {
+                        statement.setNull(i, Types.INTEGER);
+                    }
+                    else {
+                        statement.setInt(i, (int)field.val);
+                    }
+                    break;
+                case BIGINT:
+                    if (field.val == null) {
+                        statement.setNull(i, Types.INTEGER);
+                    }
+                    else {
+                        statement.setLong(i, (long)field.val);
+                    }
+                    break;
+                case SMALLINT:
+                    if (field.val == null) {
+                        statement.setNull(i, Types.INTEGER);
+                    }
+                    else {
+                        statement.setShort(i, (short)field.val);
+                    }
+                    break;
+                case REAL:
+                    if (field.val == null) {
+                        statement.setNull(i, Types.FLOAT);
+                    }
+                    else {
+                        statement.setFloat(i, (float)field.val);
+                    }
+                    break;
+                case FLOAT8:
+                    if (field.val == null) {
+                        statement.setNull(i, Types.DOUBLE);
+                    }
+                    else {
+                        statement.setDouble(i, (double)field.val);
+                    }
+                    break;
+                case BOOLEAN:
+                    if (field.val == null) {
+                        statement.setNull(i, Types.BOOLEAN);
+                    }
+                    else {
+                        statement.setBoolean(i, (boolean)field.val);
+                    }
+                    break;
+                case NUMERIC:
+                    if (field.val == null) {
+                        statement.setNull(i, Types.NUMERIC);
+                    }
+                    else {
+                        statement.setBigDecimal(i, (BigDecimal)field.val);
+                    }
+                    break;
+                case VARCHAR:
+                case BPCHAR:
+                case TEXT:
+                    if (field.val == null) {
+                        statement.setNull(i, Types.VARCHAR);
+                    }
+                    else {
+                        statement.setString(i, (String)field.val);
+                    }
+                    break;
+                case BYTEA:
+                    if (field.val == null) {
+                        statement.setNull(i, Types.BINARY);
+                    }
+                    else {
+                        statement.setBytes(i, (byte[])field.val);
+                    }
+                    break;
+                case TIMESTAMP:
+                    if (field.val == null) {
+                        statement.setNull(i, Types.TIMESTAMP);
+                    }
+                    else {
+                        statement.setTimestamp(i, (Timestamp)field.val);
+                    }
+                    break;
+                case DATE:
+                    if (field.val == null) {
+                        statement.setNull(i, Types.DATE);
+                    }
+                    else {
+                        statement.setDate(i, (Date)field.val);
+                    }
+                    break;
+                default:
+                    throw new IOException("The data tuple from JdbcResolver is 
corrupted");
+            }
+        }
+    }
+
+    private static final Log LOG = LogFactory.getLog(JdbcResolver.class);
+
+    // SimpleDateFormat to parse TEXT into DATE
+    private static ThreadLocal<SimpleDateFormat> dateSDF = new 
ThreadLocal<SimpleDateFormat>() {
+        @Override protected SimpleDateFormat initialValue() {
+            return new SimpleDateFormat("yyyy-MM-dd");
+        }
+    };
+    // SimpleDateFormat to parse TEXT into TIMESTAMP (with microseconds)
+    private static ThreadLocal<SimpleDateFormat[]> timestampSDFs = new 
ThreadLocal<SimpleDateFormat[]>() {
+        @Override protected SimpleDateFormat[] initialValue() {
+            SimpleDateFormat[] retRes = {
+                new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS"),
+                new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSS"),
+                new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSS"),
+                new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"),
+                new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SS"),
+                new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S"),
+                new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"),
+                new SimpleDateFormat("yyyy-MM-dd")
+            };
+            return retRes;
+        }
+    };
+}

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/WhereSQLBuilder.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/WhereSQLBuilder.java
 
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/WhereSQLBuilder.java
index 541aa86..d6c8fba 100644
--- 
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/WhereSQLBuilder.java
+++ 
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/WhereSQLBuilder.java
@@ -18,9 +18,6 @@ package org.apache.hawq.pxf.plugins.jdbc;
  * under the License.
  */
 
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.hawq.pxf.api.LogicalFilter;
 import org.apache.hawq.pxf.plugins.jdbc.utils.DbProduct;
 import org.apache.hawq.pxf.api.BasicFilter;
@@ -29,81 +26,99 @@ import org.apache.hawq.pxf.api.io.DataType;
 import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
 import org.apache.hawq.pxf.api.utilities.InputData;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.text.ParseException;
+
 /**
- * Parse filter object generated by parent class  {@link 
org.apache.hawq.pxf.plugins.jdbc.JdbcFilterBuilder},
- * and build WHERE statement.
- * For Multiple filters , currently only support HDOP_AND .
- * The unsupported Filter operation and  LogicalOperation ,will return null 
statement.
+ * A WHERE queries builder
  *
+ * Parses filter objects generated by {@link 
org.apache.hawq.pxf.plugins.jdbc.JdbcFilterBuilder} and builds WHERE statements
+ * Only HDOP_AND is supported for multiple filters
  */
 public class WhereSQLBuilder extends JdbcFilterBuilder {
-    private InputData inputData;
-
     public WhereSQLBuilder(InputData input) {
         inputData = input;
     }
 
     /**
-     * 1.check for LogicalOperator, Jdbc currently only support HDOP_AND.
-     * 2.and convert to BasicFilter List.
+     * Insert WHERE constraints into a given query
+     * Note that if filter is not supported, query is left unchanged
+     *
+     * @param dbName Database name (affects the behaviour for DATE constraints)
+     * @param query SQL query to insert constraints to. The query may may 
contain other WHERE statements
+     *
+     * @throws ParseException if an error happens when parsing the constraints 
(provided to class constructor)
      */
-    private static List<BasicFilter> convertBasicFilterList(Object filter, 
List<BasicFilter> returnList) throws UnsupportedFilterException {
-        if (returnList == null)
-            returnList = new ArrayList<>();
-        if (filter instanceof BasicFilter) {
-            returnList.add((BasicFilter) filter);
-            return returnList;
+    public void buildWhereSQL(String dbName, StringBuilder query) throws 
ParseException {
+        if (!inputData.hasFilter()) {
+            return;
         }
-        LogicalFilter lfilter = (LogicalFilter) filter;
-        if (lfilter.getOperator() != FilterParser.LogicalOperation.HDOP_AND)
-            throw new UnsupportedFilterException("unsupported LogicalOperation 
: " + lfilter.getOperator());
-        for (Object f : lfilter.getFilterList()) {
-            returnList = convertBasicFilterList(f, returnList);
-        }
-        return returnList;
-    }
 
-    public String buildWhereSQL(String db_product) throws Exception {
-        if (!inputData.hasFilter())
-            return null;
-        List<BasicFilter> filters = null;
         try {
+            StringBuilder prepared = new StringBuilder();
+            if (!query.toString().contains("WHERE")) {
+                prepared.append(" WHERE ");
+            }
+            else {
+                prepared.append(" AND ");
+            }
+
+            // Get constraints and parse them
             String filterString = inputData.getFilterString();
             Object filterObj = getFilterObject(filterString);
-
+            List<BasicFilter> filters = null;
             filters = convertBasicFilterList(filterObj, filters);
-            StringBuffer sb = new StringBuffer("1=1");
+
+            String andDivisor = "";
             for (Object obj : filters) {
-                BasicFilter filter = (BasicFilter) obj;
-                sb.append(" AND ");
+                prepared.append(andDivisor);
+                andDivisor = " AND ";
 
+                // Insert constraint column name
+                BasicFilter filter = (BasicFilter) obj;
                 ColumnDescriptor column = 
inputData.getColumn(filter.getColumn().index());
-                //the column name of filter
-                sb.append(column.columnName());
+                prepared.append(column.columnName());
 
-                //the operation of filter
+                // Insert constraint operator
                 FilterParser.Operation op = filter.getOperation();
                 switch (op) {
                     case HDOP_LT:
-                        sb.append("<");
+                        prepared.append(" < ");
                         break;
                     case HDOP_GT:
-                        sb.append(">");
+                        prepared.append(" > ");
                         break;
                     case HDOP_LE:
-                        sb.append("<=");
+                        prepared.append(" <= ");
                         break;
                     case HDOP_GE:
-                        sb.append(">=");
+                        prepared.append(" >= ");
                         break;
                     case HDOP_EQ:
-                        sb.append("=");
+                        prepared.append(" = ");
+                        break;
+                    case HDOP_LIKE:
+                        prepared.append(" LIKE ");
                         break;
+                    case HDOP_NE:
+                        prepared.append(" <> ");
+                        break;
+                    case HDOP_IS_NULL:
+                        prepared.append(" IS NULL");
+                        continue;
+                    case HDOP_IS_NOT_NULL:
+                        prepared.append(" IS NOT NULL");
+                        continue;
                     default:
-                        throw new UnsupportedFilterException("unsupported 
Filter operation : " + op);
+                        throw new UnsupportedFilterException("Unsupported 
Filter operation: " + op);
                 }
 
-                DbProduct dbProduct = DbProduct.getDbProduct(db_product);
+                // Insert constraint constant
+                DbProduct dbProduct = DbProduct.getDbProduct(dbName);
                 Object val = filter.getConstant().constant();
                 switch (DataType.get(column.columnTypeCode())) {
                     case SMALLINT:
@@ -112,29 +127,68 @@ public class WhereSQLBuilder extends JdbcFilterBuilder {
                     case FLOAT8:
                     case REAL:
                     case BOOLEAN:
-                        sb.append(val.toString());
+                        prepared.append(val.toString());
                         break;
                     case TEXT:
-                        sb.append("'").append(val.toString()).append("'");
+                        
prepared.append("'").append(val.toString()).append("'");
                         break;
                     case DATE:
-                        //According to the database products, for the date 
field for special treatment.
-                        sb.append(dbProduct.wrapDate(val));
+                        // Date field has different format in different 
databases
+                        prepared.append(dbProduct.wrapDate(val));
+                        break;
+                    case TIMESTAMP:
+                        // Timestamp field has different format in different 
databases
+                        prepared.append(dbProduct.wrapTimestamp(val));
                         break;
                     default:
-                        throw new UnsupportedFilterException("unsupported 
column type for filtering : " + column.columnTypeCode());
+                        throw new UnsupportedFilterException("Unsupported 
column type for filtering: " + column.columnTypeCode());
                 }
-
             }
-            return sb.toString();
-        } catch (UnsupportedFilterException ex) {
-            return null;
+
+            // No exceptions were thrown, change the provided query
+            query.append(prepared);
+        }
+        catch (UnsupportedFilterException e) {
+            LOG.debug("WHERE clause is omitted: " + e.toString());
+            // Silence the exception and do not insert constraints
+        }
+    }
+
+    /**
+     * Convert filter object into a list of {@link BasicFilter}
+     *
+     * @param filter Filter object
+     * @param returnList A list of {@link BasicFilter} to append filters to. 
Must be null if the function is not called recursively
+     */
+    private static List<BasicFilter> convertBasicFilterList(Object filter, 
List<BasicFilter> returnList) throws UnsupportedFilterException {
+        if (returnList == null) {
+            returnList = new ArrayList<>();
+        }
+
+        if (filter instanceof BasicFilter) {
+            returnList.add((BasicFilter) filter);
+            return returnList;
+        }
+
+        LogicalFilter lfilter = (LogicalFilter) filter;
+        if (lfilter.getOperator() != FilterParser.LogicalOperation.HDOP_AND) {
+            throw new UnsupportedFilterException("Logical operation '" + 
lfilter.getOperator() + "' is not supported");
+        }
+        for (Object f : lfilter.getFilterList()) {
+            returnList = convertBasicFilterList(f, returnList);
         }
+
+        return returnList;
     }
 
-    static class UnsupportedFilterException extends Exception {
-        UnsupportedFilterException(String message) {
+    private static class UnsupportedFilterException extends Exception {
+               UnsupportedFilterException(String message) {
             super(message);
         }
     }
+
+    private static final Log LOG = LogFactory.getLog(WhereSQLBuilder.class);
+
+    // {@link InputData} from PXF
+    private InputData inputData;
 }

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/ByteUtil.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/ByteUtil.java
 
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/ByteUtil.java
index cdca8a6..bb79c84 100644
--- 
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/ByteUtil.java
+++ 
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/ByteUtil.java
@@ -19,29 +19,34 @@ package org.apache.hawq.pxf.plugins.jdbc.utils;
  * under the License.
  */
 
-
 import org.apache.commons.lang.ArrayUtils;
 
 /**
- * A tool class, used to deal with byte array merging, split and other methods.
+ * A tool class for byte array merging, splitting and conversion
  */
 public class ByteUtil {
-
     public static byte[] mergeBytes(byte[] b1, byte[] b2) {
         return ArrayUtils.addAll(b1,b2);
     }
 
-    public static byte[][] splitBytes(byte[] bytes, int n) {
-        int len = bytes.length / n;
+    /**
+     * Split a byte[] array into two arrays, each of which represents a value 
of type long
+     */
+    public static byte[][] splitBytes(byte[] bytes) {
+        final int N = 8;
+        int len = bytes.length / N;
         byte[][] newBytes = new byte[len][];
         int j = 0;
         for (int i = 0; i < len; i++) {
-            newBytes[i] = new byte[n];
-            for (int k = 0; k < n; k++) newBytes[i][k] = bytes[j++];
+            newBytes[i] = new byte[N];
+            for (int k = 0; k < N; k++) newBytes[i][k] = bytes[j++];
         }
         return newBytes;
     }
 
+    /**
+     * Convert a value of type long to a byte[] array
+     */
     public static byte[] getBytes(long value) {
         byte[] b = new byte[8];
         b[0] = (byte) ((value >> 56) & 0xFF);
@@ -55,22 +60,9 @@ public class ByteUtil {
         return b;
     }
 
-    public static byte[] getBytes(int value) {
-        byte[] b = new byte[4];
-        b[0] = (byte) ((value >> 24) & 0xFF);
-        b[1] = (byte) ((value >> 16) & 0xFF);
-        b[2] = (byte) ((value >> 8) & 0xFF);
-        b[3] = (byte) ((value >> 0) & 0xFF);
-        return b;
-    }
-
-    public static int toInt(byte[] b) {
-        return (((((int) b[3]) & 0xFF) << 32) +
-                ((((int) b[2]) & 0xFF) << 40) +
-                ((((int) b[1]) & 0xFF) << 48) +
-                ((((int) b[0]) & 0xFF) << 56));
-    }
-
+    /**
+     * Convert a byte[] array to a value of type long
+     */
     public static long toLong(byte[] b) {
         return ((((long) b[7]) & 0xFF) +
                 ((((long) b[6]) & 0xFF) << 8) +

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/DbProduct.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/DbProduct.java
 
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/DbProduct.java
index 30ff1fe..c8b8cfb 100644
--- 
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/DbProduct.java
+++ 
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/DbProduct.java
@@ -20,14 +20,16 @@ package org.apache.hawq.pxf.plugins.jdbc.utils;
  */
 
 /**
- * As the syntax of different database products are not the same, such as the 
date type  field for processing, ORACLE use to_date () function, and mysql use 
Date () function.
- So we create this class to abstract public methods, the specific database 
products can implementation of these  methods.
+ * A tool class to process data types that must have different form in 
different databases.
+ * Such processing is required to create correct constraints (WHERE 
statements).
  */
 public abstract class DbProduct {
-    //wrap date string
-    public abstract String wrapDate(Object date_val);
-
-
+    /**
+     * Get an instance of some class - the database product
+     *
+     * @param String dbName A full name of the database
+     * @return a DbProduct of the required class
+     */
     public static DbProduct getDbProduct(String dbName) {
         if (dbName.toUpperCase().contains("MYSQL"))
             return new MysqlProduct();
@@ -35,15 +37,40 @@ public abstract class DbProduct {
             return new OracleProduct();
         else if (dbName.toUpperCase().contains("POSTGRES"))
             return new PostgresProduct();
+        else if (dbName.toUpperCase().contains("MICROSOFT"))
+            return new MicrosoftProduct();
         else
-            //Unsupported databases may execute errors
             return new CommonProduct();
     }
+
+    /**
+     * Wraps a given date value the way required by a target database
+     *
+     * @param val {@link java.sql.Date} object to wrap
+     * @return a string with a properly wrapped date object
+     */
+    public abstract String wrapDate(Object val);
+
+    /**
+     * Wraps a given timestamp value the way required by a target database
+     *
+     * @param val {@link java.sql.Timestamp} object to wrap
+     * @return a string with a properly wrapped timestamp object
+     */
+    public abstract String wrapTimestamp(Object val);
 }
 
+/**
+ * Common product. Used when no other products are avalibale
+ */
 class CommonProduct extends DbProduct {
     @Override
-    public String wrapDate(Object dateVal) {
-        return "date'" + dateVal + "'";
+    public String wrapDate(Object val) {
+        return "date'" + val + "'";
+    }
+
+    @Override
+    public String wrapTimestamp(Object val) {
+        return "'" + val + "'";
     }
 }

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MicrosoftProduct.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MicrosoftProduct.java
 
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MicrosoftProduct.java
new file mode 100644
index 0000000..5cec52d
--- /dev/null
+++ 
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MicrosoftProduct.java
@@ -0,0 +1,35 @@
+package org.apache.hawq.pxf.plugins.jdbc.utils;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Implements methods for the Microsoft SQL server database
+ */
+public class MicrosoftProduct extends DbProduct {
+    @Override
+    public String wrapDate(Object val){
+        return "'" + val + "'";
+    }
+
+    @Override
+    public String wrapTimestamp(Object val) {
+        return "'" + val + "'";
+    }
+}

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MysqlProduct.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MysqlProduct.java
 
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MysqlProduct.java
index 2e60ada..27f7605 100644
--- 
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MysqlProduct.java
+++ 
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MysqlProduct.java
@@ -20,12 +20,16 @@ package org.apache.hawq.pxf.plugins.jdbc.utils;
  */
 
 /**
- * Implements methods for MySQL Database.
+ * Implements methods for the MySQL Database.
  */
 public class MysqlProduct extends DbProduct {
+    @Override
+    public String wrapDate(Object val){
+        return "DATE('" + val + "')";
+    }
 
     @Override
-    public String wrapDate(Object dateVal){
-        return "DATE('" + dateVal + "')";
+    public String wrapTimestamp(Object val) {
+        return "'" + val + "'";
     }
 }

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/OracleProduct.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/OracleProduct.java
 
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/OracleProduct.java
index b46c5f3..c2c656b 100644
--- 
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/OracleProduct.java
+++ 
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/OracleProduct.java
@@ -20,11 +20,16 @@ package org.apache.hawq.pxf.plugins.jdbc.utils;
  */
 
 /**
- * Implements methods for Oracle Database.
+ * Implements methods for the Oracle Database.
  */
 public class OracleProduct extends DbProduct {
     @Override
-    public String wrapDate(Object dateVal) {
-        return "to_date('" + dateVal + "','yyyy-mm-dd')";
+    public String wrapDate(Object val) {
+        return "to_date('" + val + "', 'YYYY-MM-DD')";
+    }
+
+    @Override
+    public String wrapTimestamp(Object val) {
+        return "to_timestamp('" + val + "', 'YYYY-MM-DD HH:MI:SS.FF')";
     }
 }

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/PostgresProduct.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/PostgresProduct.java
 
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/PostgresProduct.java
index 901cf2e..c25ec96 100644
--- 
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/PostgresProduct.java
+++ 
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/PostgresProduct.java
@@ -20,11 +20,16 @@ package org.apache.hawq.pxf.plugins.jdbc.utils;
  */
 
 /**
- * Implements methods for Postgres Database.
+ * Implements methods for the PostgreSQL.
  */
 public class PostgresProduct extends DbProduct {
     @Override
-    public String wrapDate(Object dateVal) {
-        return "date'" + dateVal + "'";
+    public String wrapDate(Object val) {
+        return "date'" + val + "'";
+    }
+
+    @Override
+    public String wrapTimestamp(Object val) {
+        return "'" + val + "'";
     }
 }

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/BatchWriterCallable.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/BatchWriterCallable.java
 
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/BatchWriterCallable.java
new file mode 100644
index 0000000..3e1404c
--- /dev/null
+++ 
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/BatchWriterCallable.java
@@ -0,0 +1,109 @@
+package org.apache.hawq.pxf.plugins.jdbc.writercallable;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.plugins.jdbc.JdbcResolver;
+import org.apache.hawq.pxf.plugins.jdbc.JdbcPlugin;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * This writer makes batch INSERTs.
+ *
+ * A call() is required after a certain number of supply() calls
+ */
+class BatchWriterCallable implements WriterCallable {
+    @Override
+    public void supply(OneRow row) throws IllegalStateException {
+        if ((batchSize > 0) && (rows.size() >= batchSize)) {
+            throw new IllegalStateException("Trying to supply() a OneRow 
object to a full WriterCallable");
+        }
+        if (row == null) {
+            throw new IllegalArgumentException("Trying to supply() a null 
OneRow object");
+        }
+        rows.add(row);
+    }
+
+    @Override
+    public boolean isCallRequired() {
+        return (batchSize > 0) && (rows.size() >= batchSize);
+    }
+
+    @Override
+    public SQLException call() throws IOException, SQLException, 
ClassNotFoundException {
+        if (rows.isEmpty()) {
+            return null;
+        }
+
+        boolean statementMustBeDeleted = false;
+        if (statement == null) {
+            statement = plugin.getPreparedStatement(plugin.getConnection(), 
query);
+            statementMustBeDeleted = true;
+        }
+
+        for (OneRow row : rows) {
+            JdbcResolver.decodeOneRowToPreparedStatement(row, statement);
+            statement.addBatch();
+        }
+
+        try {
+            statement.executeBatch();
+        }
+        catch (SQLException e) {
+            return e;
+        }
+        finally {
+            rows.clear();
+            if (statementMustBeDeleted) {
+                JdbcPlugin.closeStatement(statement);
+                statement = null;
+            }
+        }
+
+        return null;
+    }
+
+    /**
+     * Construct a new batch writer
+     */
+    BatchWriterCallable(JdbcPlugin plugin, String query, PreparedStatement 
statement, int batchSize) {
+        if (plugin == null || query == null) {
+            throw new IllegalArgumentException("The provided JdbcPlugin or SQL 
query is null");
+        }
+
+        this.plugin = plugin;
+        this.query = query;
+        this.statement = statement;
+        this.batchSize = batchSize;
+
+        rows = new LinkedList<>();
+    }
+
+    private final JdbcPlugin plugin;
+    private final String query;
+    private PreparedStatement statement;
+    private List<OneRow> rows;
+    private final int batchSize;
+}

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/SimpleWriterCallable.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/SimpleWriterCallable.java
 
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/SimpleWriterCallable.java
new file mode 100644
index 0000000..63dbb29
--- /dev/null
+++ 
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/SimpleWriterCallable.java
@@ -0,0 +1,102 @@
+package org.apache.hawq.pxf.plugins.jdbc.writercallable;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.plugins.jdbc.JdbcResolver;
+import org.apache.hawq.pxf.plugins.jdbc.JdbcPlugin;
+
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * This writer makes simple, one-by-one INSERTs.
+ *
+ * A call() is required after every supply()
+ */
+class SimpleWriterCallable implements WriterCallable {
+    @Override
+    public void supply(OneRow row) throws IllegalStateException {
+        if (this.row != null) {
+            throw new IllegalStateException("Trying to supply() a OneRow 
object to a full WriterCallable");
+        }
+        if (row == null) {
+            throw new IllegalArgumentException("Trying to supply() a null 
OneRow object");
+        }
+        this.row = row;
+    }
+
+    @Override
+    public boolean isCallRequired() {
+        return this.row != null;
+    }
+
+    @Override
+    public SQLException call() throws IOException, SQLException, 
ClassNotFoundException {
+        if (row == null) {
+            return null;
+        }
+
+        boolean statementMustBeDeleted = false;
+        if (statement == null) {
+            statement = plugin.getPreparedStatement(plugin.getConnection(), 
query);
+            statementMustBeDeleted = true;
+        }
+
+        JdbcResolver.decodeOneRowToPreparedStatement(row, statement);
+
+        try {
+            if (statement.executeUpdate() != 1) {
+                throw new SQLException("The number of rows affected by INSERT 
query is not equal to the number of rows provided");
+            }
+        }
+        catch (SQLException e) {
+            return e;
+        }
+        finally {
+            row = null;
+            if (statementMustBeDeleted) {
+                JdbcPlugin.closeStatement(statement);
+                statement = null;
+            }
+        }
+
+        return null;
+    }
+
+    /**
+     * Construct a new simple writer
+     */
+    SimpleWriterCallable(JdbcPlugin plugin, String query, PreparedStatement 
statement) {
+        if ((plugin == null) || (query == null)) {
+            throw new IllegalArgumentException("The provided JdbcPlugin or SQL 
query is null");
+        }
+        this.plugin = plugin;
+        this.query = query;
+        this.statement = statement;
+        row = null;
+    }
+
+    private final JdbcPlugin plugin;
+    private final String query;
+    private PreparedStatement statement;
+    private OneRow row;
+}

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/WriterCallable.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/WriterCallable.java
 
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/WriterCallable.java
new file mode 100644
index 0000000..e2a6916
--- /dev/null
+++ 
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/WriterCallable.java
@@ -0,0 +1,56 @@
+package org.apache.hawq.pxf.plugins.jdbc.writercallable;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hawq.pxf.api.OneRow;
+
+import java.util.concurrent.Callable;
+import java.sql.SQLException;
+
+/**
+ * An object that processes INSERT operation on {@link OneRow} objects
+ */
+public interface WriterCallable extends Callable<SQLException> {
+    /**
+     * Pass the next OneRow to this WriterCallable.
+     *
+     * @throws IllegalStateException if this WriterCallable must be call()ed 
before the next call to supply()
+     */
+    void supply(OneRow row) throws IllegalStateException;
+
+    /**
+     * Check whether this WriterCallable must be called
+     *
+     * @return true if this WriterCallable must be call()ed before the next 
call to supply()
+     * @return false otherwise
+     */
+    boolean isCallRequired();
+
+    /**
+     * Execute an INSERT query.
+     *
+     * @return null or a SQLException that happened when executing the query
+     * @return null if the query was empty (nothing was there to execute)
+     *
+     * @throws Exception an exception that happened during execution, but that 
is not related to the execution of the query itself (for instance, it may 
originate from {@link java.sql.PreparedStatement} close() method)
+     */
+    @Override
+    SQLException call() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/WriterCallableFactory.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/WriterCallableFactory.java
 
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/WriterCallableFactory.java
new file mode 100644
index 0000000..aaf13bd
--- /dev/null
+++ 
b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/WriterCallableFactory.java
@@ -0,0 +1,97 @@
+package org.apache.hawq.pxf.plugins.jdbc.writercallable;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.plugins.jdbc.JdbcPlugin;
+
+import java.sql.PreparedStatement;
+
+/**
+ * An object that processes INSERT operation on {@link OneRow} objects
+ */
+public class WriterCallableFactory {
+    /**
+     * Create a new {@link WriterCallable} factory.
+     *
+     * Note that 'setPlugin' and 'setQuery' must be called before construction 
of a {@link WriterCallable}.
+     *
+     * By default, 'statement' is null
+     */
+    public WriterCallableFactory() {
+        batchSize = JdbcPlugin.DEFAULT_BATCH_SIZE;
+        plugin = null;
+        query = null;
+        statement = null;
+    }
+
+    /**
+     * Get an instance of WriterCallable
+     *
+     * @return an implementation of WriterCallable, chosen based on parameters 
that were set for this factory
+     */
+    public WriterCallable get() {
+        if (batchSize > 1) {
+            return new BatchWriterCallable(plugin, query, statement, 
batchSize);
+        }
+        return new SimpleWriterCallable(plugin, query, statement);
+    }
+
+    /**
+     * Set {@link JdbcPlugin} to use.
+     * REQUIRED
+     */
+    public void setPlugin(JdbcPlugin plugin) {
+        this.plugin = plugin;
+    }
+
+    /**
+     * Set SQL query to use.
+     * REQUIRED
+     */
+    public void setQuery(String query) {
+        this.query = query;
+    }
+
+    /**
+     * Set batch size to use
+     *
+     * @param batchSize > 1: Use batches of specified size
+     * @param batchSize < 1: Do not use batches
+     */
+    public void setBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    /**
+     * Set statement to use.
+     *
+     * @param statement = null: Create a new connection & a new statement each 
time {@link WriterCallable} is called
+     * @param statement not null: Use the given statement and do not close or 
reopen it
+     */
+    public void setStatement(PreparedStatement statement) {
+        this.statement = statement;
+    }
+
+    private int batchSize;
+    private JdbcPlugin plugin;
+    private String query;
+    private PreparedStatement statement;
+}

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenterTest.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenterTest.java
 
b/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenterTest.java
index 6785af6..33ae585 100644
--- 
a/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenterTest.java
+++ 
b/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenterTest.java
@@ -56,7 +56,7 @@ public class JdbcPartitionFragmenterTest {
 
         //fragment - 1
         byte[] fragMeta = fragments.get(0).getMetadata();
-        byte[][] newBytes = ByteUtil.splitBytes(fragMeta, 8);
+        byte[][] newBytes = ByteUtil.splitBytes(fragMeta);
         long fragStart = ByteUtil.toLong(newBytes[0]);
         long fragEnd = ByteUtil.toLong(newBytes[1]);
         assertDateEquals(fragStart, 2008, 1, 1);
@@ -64,7 +64,7 @@ public class JdbcPartitionFragmenterTest {
 
         //fragment - 12
         fragMeta = fragments.get(11).getMetadata();
-        newBytes = ByteUtil.splitBytes(fragMeta, 8);
+        newBytes = ByteUtil.splitBytes(fragMeta);
         fragStart = ByteUtil.toLong(newBytes[0]);
         fragEnd = ByteUtil.toLong(newBytes[1]);
         assertDateEquals(fragStart, 2008, 12, 1);
@@ -102,17 +102,17 @@ public class JdbcPartitionFragmenterTest {
 
         //fragment - 1
         byte[] fragMeta = fragments.get(0).getMetadata();
-        byte[][] newBytes = ByteUtil.splitBytes(fragMeta, 4);
-        int fragStart = ByteUtil.toInt(newBytes[0]);
-        int fragEnd = ByteUtil.toInt(newBytes[1]);
+        byte[][] newBytes = ByteUtil.splitBytes(fragMeta);
+        long fragStart = ByteUtil.toLong(newBytes[0]);
+        long fragEnd = ByteUtil.toLong(newBytes[1]);
         assertEquals(2001, fragStart);
         assertEquals(2003, fragEnd);
 
         //fragment - 6
         fragMeta = fragments.get(5).getMetadata();
-        newBytes = ByteUtil.splitBytes(fragMeta, 4);
-        fragStart = ByteUtil.toInt(newBytes[0]);
-        fragEnd = ByteUtil.toInt(newBytes[1]);
+        newBytes = ByteUtil.splitBytes(fragMeta);
+        fragStart = ByteUtil.toLong(newBytes[0]);
+        fragEnd = ByteUtil.toLong(newBytes[1]);
         assertEquals(2011, fragStart);
         assertEquals(2012, fragEnd);
 

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/SqlBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/SqlBuilderTest.java
 
b/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/SqlBuilderTest.java
index ebe367d..de173b8 100644
--- 
a/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/SqlBuilderTest.java
+++ 
b/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/SqlBuilderTest.java
@@ -59,44 +59,53 @@ public class SqlBuilderTest {
     public void testIdFilter() throws Exception {
         prepareConstruction();
         when(inputData.hasFilter()).thenReturn(true);
-        when(inputData.getFilterString()).thenReturn("a0c20s1d1o5");//id=1
+        // id = 1
+        when(inputData.getFilterString()).thenReturn("a0c20s1d1o5");
 
         WhereSQLBuilder builder = new WhereSQLBuilder(inputData);
-        assertEquals("1=1 AND id=1", builder.buildWhereSQL(DB_PRODUCT));
+        StringBuilder sb = new StringBuilder();
+        builder.buildWhereSQL(DB_PRODUCT, sb);
+        assertEquals(" WHERE id = 1", sb.toString());
     }
 
     @Test
     public void testDateAndAmtFilter() throws Exception {
         prepareConstruction();
         when(inputData.hasFilter()).thenReturn(true);
-        // cdate>'2008-02-01' and cdate<'2008-12-01' and amt > 1200
+        // cdate > '2008-02-01' and cdate < '2008-12-01' and amt > 1200
         
when(inputData.getFilterString()).thenReturn("a1c25s10d2008-02-01o2a1c25s10d2008-12-01o1l0a2c20s4d1200o2l0");
 
         WhereSQLBuilder builder = new WhereSQLBuilder(inputData);
-        assertEquals("1=1 AND cdate>DATE('2008-02-01') AND 
cdate<DATE('2008-12-01') AND amt>1200"
-                , builder.buildWhereSQL(DB_PRODUCT));
+        StringBuilder sb = new StringBuilder();
+        builder.buildWhereSQL(DB_PRODUCT, sb);
+        assertEquals(" WHERE cdate > DATE('2008-02-01') AND cdate < 
DATE('2008-12-01') AND amt > 1200"
+                , sb.toString());
     }
 
     @Test
     public void testUnsupportedOperationFilter() throws Exception {
         prepareConstruction();
         when(inputData.hasFilter()).thenReturn(true);
-        // grade like 'bad'
-        when(inputData.getFilterString()).thenReturn("a3c25s3dbado7");
+        // IN 'bad'
+        when(inputData.getFilterString()).thenReturn("a3c25s3dbado10");
 
         WhereSQLBuilder builder = new WhereSQLBuilder(inputData);
-        assertEquals(null, builder.buildWhereSQL(DB_PRODUCT));
+        StringBuilder sb = new StringBuilder();
+        builder.buildWhereSQL(DB_PRODUCT, sb);
+        assertEquals("", sb.toString());
     }
 
     @Test
     public void testUnsupportedLogicalFilter() throws Exception {
         prepareConstruction();
         when(inputData.hasFilter()).thenReturn(true);
-        // cdate>'2008-02-01' or amt < 1200
+        // cdate > '2008-02-01' or amt < 1200
         
when(inputData.getFilterString()).thenReturn("a1c25s10d2008-02-01o2a2c20s4d1200o2l1");
 
         WhereSQLBuilder builder = new WhereSQLBuilder(inputData);
-        assertEquals(null, builder.buildWhereSQL(DB_PRODUCT));
+        StringBuilder sb = new StringBuilder();
+        builder.buildWhereSQL(DB_PRODUCT, sb);
+        assertEquals("", sb.toString());
     }
 
     @Test
@@ -110,11 +119,11 @@ public class SqlBuilderTest {
         List<Fragment> fragments = fragment.getFragments();
         assertEquals(6, fragments.size());
 
-        //partition-1 : cdate>=2008-01-01 and cdate<2008-03-01
+        // Partition: cdate >= 2008-01-01 and cdate < 2008-03-01
         
when(inputData.getFragmentMetadata()).thenReturn(fragments.get(0).getMetadata());
-        String fragmentSql = fragment.buildFragmenterSql(DB_PRODUCT, 
ORIGINAL_SQL);
-        assertEquals(ORIGINAL_SQL + " WHERE 1=1  AND " +
-                "cdate >= DATE('2008-01-01') AND cdate < DATE('2008-03-01')", 
fragmentSql);
+        StringBuilder sb = new StringBuilder(ORIGINAL_SQL);
+        JdbcPartitionFragmenter.buildFragmenterSql(inputData, DB_PRODUCT, sb);
+        assertEquals(ORIGINAL_SQL + " WHERE cdate >= DATE('2008-01-01') AND 
cdate < DATE('2008-03-01')", sb.toString());
     }
 
     @Test
@@ -125,19 +134,19 @@ public class SqlBuilderTest {
         
when(inputData.getUserProperty("PARTITION_BY")).thenReturn("grade:enum");
         
when(inputData.getUserProperty("RANGE")).thenReturn("excellent:good:general:bad");
 
+        StringBuilder sb = new StringBuilder(ORIGINAL_SQL);
         WhereSQLBuilder builder = new WhereSQLBuilder(inputData);
-        String whereSql = builder.buildWhereSQL(DB_PRODUCT);
-        assertEquals("1=1 AND id>5", whereSql);
+        builder.buildWhereSQL(DB_PRODUCT, sb);
+        assertEquals(ORIGINAL_SQL + " WHERE id > 5", sb.toString());
 
         JdbcPartitionFragmenter fragment = new 
JdbcPartitionFragmenter(inputData);
         List<Fragment> fragments = fragment.getFragments();
 
-        //partition-1 : id>5 and grade='excellent'
+        // Partition: id > 5 and grade = 'excellent'
         
when(inputData.getFragmentMetadata()).thenReturn(fragments.get(0).getMetadata());
 
-        String filterSql = ORIGINAL_SQL + " WHERE " + whereSql;
-        String fragmentSql = fragment.buildFragmenterSql(DB_PRODUCT, 
filterSql);
-        assertEquals(filterSql + " AND grade='excellent'", fragmentSql);
+        JdbcPartitionFragmenter.buildFragmenterSql(inputData, DB_PRODUCT, sb);
+        assertEquals(ORIGINAL_SQL + " WHERE id > 5 AND grade = 'excellent'", 
sb.toString());
     }
 
     @Test
@@ -150,8 +159,9 @@ public class SqlBuilderTest {
 
         
when(inputData.getFragmentMetadata()).thenReturn(fragments.get(0).getMetadata());
 
-        String fragmentSql = fragment.buildFragmenterSql(DB_PRODUCT, 
ORIGINAL_SQL);
-        assertEquals(ORIGINAL_SQL, fragmentSql);
+        StringBuilder sb = new StringBuilder(ORIGINAL_SQL);
+        JdbcPartitionFragmenter.buildFragmenterSql(inputData, DB_PRODUCT, sb);
+        assertEquals(ORIGINAL_SQL, sb.toString());
     }
 
 

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml 
b/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
index 252791e..0cddad1 100644
--- a/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
+++ b/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
@@ -177,11 +177,11 @@ under the License.
     </profile>
     <profile>
         <name>Jdbc</name>
-        <description>A profile for reading data into HAWQ via 
JDBC</description>
+        <description>A profile for reading and writing data via 
JDBC</description>
         <plugins>
             
<fragmenter>org.apache.hawq.pxf.plugins.jdbc.JdbcPartitionFragmenter</fragmenter>
-            
<accessor>org.apache.hawq.pxf.plugins.jdbc.JdbcReadAccessor</accessor>
-            
<resolver>org.apache.hawq.pxf.plugins.jdbc.JdbcReadResolver</resolver>
+            <accessor>org.apache.hawq.pxf.plugins.jdbc.JdbcAccessor</accessor>
+            <resolver>org.apache.hawq.pxf.plugins.jdbc.JdbcResolver</resolver>
         </plugins>
     </profile>
     <profile>

Reply via email to