Repository: nifi
Updated Branches:
  refs/heads/master b4a9f52a4 -> b5ca7adbb


NIFI-978: Support parameterized statements in ExecuteSQL

Signed-off-by: Pierre Villard <pierre.villard...@gmail.com>

This closes #2433.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b5ca7adb
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b5ca7adb
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b5ca7adb

Branch: refs/heads/master
Commit: b5ca7adbb97c603cbc721e105c4fe279cdcb085b
Parents: b4a9f52
Author: Matthew Burgess <mattyb...@apache.org>
Authored: Thu Jan 25 10:24:17 2018 -0500
Committer: Pierre Villard <pierre.villard...@gmail.com>
Committed: Mon Feb 12 20:44:31 2018 +0100

----------------------------------------------------------------------
 .../nifi/processors/standard/ExecuteSQL.java    |  83 ++++---
 .../apache/nifi/processors/standard/PutSQL.java | 249 +------------------
 .../processors/standard/util/JdbcCommon.java    | 242 ++++++++++++++++++
 .../processors/standard/TestExecuteSQL.java     |  34 ++-
 4 files changed, 319 insertions(+), 289 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/b5ca7adb/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index 2104126..cb9388f 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -16,13 +16,11 @@
  */
 package org.apache.nifi.processors.standard;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
+import java.nio.charset.Charset;
 import java.sql.Connection;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -35,6 +33,8 @@ import org.apache.commons.io.IOUtils;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -50,8 +50,6 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.standard.util.JdbcCommon;
 import org.apache.nifi.util.StopWatch;
@@ -64,11 +62,32 @@ import static 
org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC
 @EventDriven
 @InputRequirement(Requirement.INPUT_ALLOWED)
 @Tags({"sql", "select", "jdbc", "query", "database"})
-@CapabilityDescription("Execute provided SQL select query. Query result will 
be converted to Avro format."
+@CapabilityDescription("Executes provided SQL select query. Query result will 
be converted to Avro format."
         + " Streaming is used so arbitrarily large result sets are supported. 
This processor can be scheduled to run on "
         + "a timer, or cron expression, using the standard scheduling methods, 
or it can be triggered by an incoming FlowFile. "
         + "If it is triggered by an incoming FlowFile, then attributes of that 
FlowFile will be available when evaluating the "
-        + "select query. FlowFile attribute 'executesql.row.count' indicates 
how many rows were selected.")
+        + "select query, and the query may use the ? to escape parameters. In 
this case, the parameters to use must exist as FlowFile attributes "
+        + "with the naming convention sql.args.N.type and sql.args.N.value, 
where N is a positive integer. The sql.args.N.type is expected to be "
+        + "a number indicating the JDBC Type. The content of the FlowFile is 
expected to be in UTF-8 format. "
+        + "FlowFile attribute 'executesql.row.count' indicates how many rows 
were selected.")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "sql.args.N.type", description = "Incoming 
FlowFiles are expected to be parametrized SQL statements. The type of each 
Parameter is specified as an integer "
+                + "that represents the JDBC Type of the parameter."),
+        @ReadsAttribute(attribute = "sql.args.N.value", description = 
"Incoming FlowFiles are expected to be parametrized SQL statements. The value 
of the Parameters are specified as "
+                + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and 
so on. The type of the sql.args.1.value Parameter is specified by the 
sql.args.1.type attribute."),
+        @ReadsAttribute(attribute = "sql.args.N.format", description = "This 
attribute is always optional, but default options may not always work for your 
data. "
+                + "Incoming FlowFiles are expected to be parametrized SQL 
statements. In some cases "
+                + "a format option needs to be specified, currently this is 
only applicable for binary data types, dates, times and timestamps. Binary Data 
Types (defaults to 'ascii') - "
+                + "ascii: each string character in your attribute value 
represents a single byte. This is the format provided by Avro Processors. "
+                + "base64: the string is a Base64 encoded string that can be 
decoded to bytes. "
+                + "hex: the string is hex encoded with all letters in upper 
case and no '0x' at the beginning. "
+                + "Dates/Times/Timestamps - "
+                + "Date, Time and Timestamp formats all support both custom 
formats or named format ('yyyy-MM-dd','ISO_OFFSET_DATE_TIME') "
+                + "as specified according to 
java.time.format.DateTimeFormatter. "
+                + "If not specified, a long value input is expected to be an 
unix epoch (milli seconds from 1970/1/1), or a string value in "
+                + "'yyyy-MM-dd' format for Date, 'HH:mm:ss.SSS' for Time (some 
database engines e.g. Derby or MySQL do not support milliseconds and will 
truncate milliseconds), "
+                + "'yyyy-MM-dd HH:mm:ss.SSS' for Timestamp is used.")
+})
 @WritesAttributes({
     @WritesAttribute(attribute="executesql.row.count", description = "Contains 
the number of rows returned in the select query"),
     @WritesAttribute(attribute="executesql.query.duration", description = 
"Duration of the query in milliseconds")
@@ -187,22 +206,20 @@ public class ExecuteSQL extends AbstractProcessor {
             // If the query is not set, then an incoming flow file is 
required, and expected to contain a valid SQL select query.
             // If there is no incoming connection, onTrigger will not be 
called as the processor will fail when scheduled.
             final StringBuilder queryContents = new StringBuilder();
-            session.read(fileToProcess, new InputStreamCallback() {
-                @Override
-                public void process(InputStream in) throws IOException {
-                    queryContents.append(IOUtils.toString(in));
-                }
-            });
+            session.read(fileToProcess, in -> 
queryContents.append(IOUtils.toString(in, Charset.defaultCharset())));
             selectQuery = queryContents.toString();
         }
 
         int resultCount=0;
         try (final Connection con = dbcpService.getConnection();
-            final Statement st = con.createStatement()) {
+            final PreparedStatement st = con.prepareStatement(selectQuery)) {
             st.setQueryTimeout(queryTimeout); // timeout in seconds
 
+            if (fileToProcess != null) {
+                JdbcCommon.setParameters(st, fileToProcess.getAttributes());
+            }
             logger.debug("Executing query {}", new Object[]{selectQuery});
-            boolean results = st.execute(selectQuery);
+            boolean results = st.execute();
 
 
             while(results){
@@ -215,22 +232,19 @@ public class ExecuteSQL extends AbstractProcessor {
                 }
 
                 final AtomicLong nrOfRows = new AtomicLong(0L);
-                resultSetFF = session.write(resultSetFF, new 
OutputStreamCallback() {
-                    @Override
-                    public void process(final OutputStream out) throws 
IOException {
-                        try {
+                resultSetFF = session.write(resultSetFF, out -> {
+                    try {
 
-                            final ResultSet resultSet = st.getResultSet();
-                            final JdbcCommon.AvroConversionOptions options = 
JdbcCommon.AvroConversionOptions.builder()
-                                    .convertNames(convertNamesForAvro)
-                                    .useLogicalTypes(useAvroLogicalTypes)
-                                    .defaultPrecision(defaultPrecision)
-                                    .defaultScale(defaultScale)
-                                    .build();
-                            
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
-                        } catch (final SQLException e) {
-                            throw new ProcessException(e);
-                        }
+                        final ResultSet resultSet = st.getResultSet();
+                        final JdbcCommon.AvroConversionOptions options = 
JdbcCommon.AvroConversionOptions.builder()
+                                .convertNames(convertNamesForAvro)
+                                .useLogicalTypes(useAvroLogicalTypes)
+                                .defaultPrecision(defaultPrecision)
+                                .defaultScale(defaultScale)
+                                .build();
+                        nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, 
out, options, null));
+                    } catch (final SQLException e) {
+                        throw new ProcessException(e);
                     }
                 });
 
@@ -261,12 +275,7 @@ public class ExecuteSQL extends AbstractProcessor {
                 if(resultCount > 0){
                     session.remove(fileToProcess);
                 } else {
-                    fileToProcess = session.write(fileToProcess, new 
OutputStreamCallback() {
-                        @Override
-                        public void process(OutputStream out) throws 
IOException {
-                            JdbcCommon.createEmptyAvroStream(out);
-                        }
-                    });
+                    fileToProcess = session.write(fileToProcess, 
JdbcCommon::createEmptyAvroStream);
 
                     session.transfer(fileToProcess, REL_SUCCESS);
                 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/b5ca7adb/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
index b50dcd0..fd9501b 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
@@ -48,37 +48,19 @@ import 
org.apache.nifi.processor.util.pattern.PartialFunctions.FlowFileGroup;
 import org.apache.nifi.processor.util.pattern.PutGroup;
 import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
 import org.apache.nifi.processor.util.pattern.RoutingResult;
+import org.apache.nifi.processors.standard.util.JdbcCommon;
 import org.apache.nifi.stream.io.StreamUtils;
 
-import javax.xml.bind.DatatypeConverter;
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.StringReader;
-import java.io.UnsupportedEncodingException;
-import java.math.BigDecimal;
 import java.nio.charset.StandardCharsets;
 import java.sql.BatchUpdateException;
 import java.sql.Connection;
-import java.sql.Date;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
-import java.sql.SQLDataException;
 import java.sql.SQLException;
 import java.sql.SQLNonTransientException;
 import java.sql.Statement;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.sql.Types;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.time.ZoneId;
-import java.time.format.DateTimeFormatter;
-import java.time.temporal.TemporalAccessor;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Comparator;
@@ -89,8 +71,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiFunction;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import static 
org.apache.nifi.processor.util.pattern.ExceptionHandler.createOnError;
 
@@ -197,15 +177,10 @@ public class PutSQL extends 
AbstractSessionFactoryProcessor {
                     + "such as an invalid query or an integrity constraint 
violation")
             .build();
 
-    private static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = 
Pattern.compile("sql\\.args\\.(\\d+)\\.type");
-    private static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+");
-
     private static final String FRAGMENT_ID_ATTR = 
FragmentAttributes.FRAGMENT_ID.key();
     private static final String FRAGMENT_INDEX_ATTR = 
FragmentAttributes.FRAGMENT_INDEX.key();
     private static final String FRAGMENT_COUNT_ATTR = 
FragmentAttributes.FRAGMENT_COUNT.key();
 
-    private static final Pattern LONG_PATTERN = 
Pattern.compile("^-?\\d{1,19}$");
-
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> properties = new ArrayList<>();
@@ -310,7 +285,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor 
{
 
             if(!exceptionHandler.execute(fc, flowFile, input -> {
                 final PreparedStatement stmt = 
enclosure.getCachedStatement(conn);
-                setParameters(stmt, flowFile.getAttributes());
+                JdbcCommon.setParameters(stmt, flowFile.getAttributes());
                 stmt.addBatch();
             }, onFlowFileError(context, session, result))) {
                 continue;
@@ -383,7 +358,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor 
{
                     try (final PreparedStatement stmt = 
targetEnclosure.getNewStatement(conn, fc.obtainKeys)) {
 
                         // set the appropriate parameters on the statement.
-                        setParameters(stmt, flowFile.getAttributes());
+                        JdbcCommon.setParameters(stmt, 
flowFile.getAttributes());
 
                         stmt.executeUpdate();
 
@@ -677,46 +652,6 @@ public class PutSQL extends 
AbstractSessionFactoryProcessor {
         return sql;
     }
 
-
-    /**
-     * Sets all of the appropriate parameters on the given PreparedStatement, 
based on the given FlowFile attributes.
-     *
-     * @param stmt the statement to set the parameters on
-     * @param attributes the attributes from which to derive parameter 
indices, values, and types
-     * @throws SQLException if the PreparedStatement throws a SQLException 
when the appropriate setter is called
-     */
-    private void setParameters(final PreparedStatement stmt, final Map<String, 
String> attributes) throws SQLException {
-        for (final Map.Entry<String, String> entry : attributes.entrySet()) {
-            final String key = entry.getKey();
-            final Matcher matcher = SQL_TYPE_ATTRIBUTE_PATTERN.matcher(key);
-            if (matcher.matches()) {
-                final int parameterIndex = Integer.parseInt(matcher.group(1));
-
-                final boolean isNumeric = 
NUMBER_PATTERN.matcher(entry.getValue()).matches();
-                if (!isNumeric) {
-                    throw new SQLDataException("Value of the " + key + " 
attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral 
type");
-                }
-
-                final int jdbcType = Integer.parseInt(entry.getValue());
-                final String valueAttrName = "sql.args." + parameterIndex + 
".value";
-                final String parameterValue = attributes.get(valueAttrName);
-                final String formatAttrName = "sql.args." + parameterIndex + 
".format";
-                final String parameterFormat = 
attributes.containsKey(formatAttrName)? attributes.get(formatAttrName):"";
-
-                try {
-                    setParameter(stmt, valueAttrName, parameterIndex, 
parameterValue, jdbcType, parameterFormat);
-                } catch (final NumberFormatException nfe) {
-                    throw new SQLDataException("The value of the " + 
valueAttrName + " is '" + parameterValue + "', which cannot be converted into 
the necessary data type", nfe);
-                } catch (ParseException pe) {
-                    throw new SQLDataException("The value of the " + 
valueAttrName + " is '" + parameterValue + "', which cannot be converted to a 
timestamp", pe);
-                } catch (UnsupportedEncodingException uee) {
-                    throw new SQLDataException("The value of the " + 
valueAttrName + " is '" + parameterValue + "', which cannot be converted to 
UTF-8", uee);
-                }
-            }
-        }
-    }
-
-
     /**
      * Determines which relationship the given FlowFiles should go to, based 
on a transaction timing out or
      * transaction information not being present. If the FlowFiles should be 
processed and not transferred
@@ -810,183 +745,7 @@ public class PutSQL extends 
AbstractSessionFactoryProcessor {
         return false;  // not enough FlowFiles for this transaction. Return 
them all to queue.
     }
 
-    /**
-     * Determines how to map the given value to the appropriate JDBC data type 
and sets the parameter on the
-     * provided PreparedStatement
-     *
-     * @param stmt the PreparedStatement to set the parameter on
-     * @param attrName the name of the attribute that the parameter is coming 
from - for logging purposes
-     * @param parameterIndex the index of the SQL parameter to set
-     * @param parameterValue the value of the SQL parameter to set
-     * @param jdbcType the JDBC Type of the SQL parameter to set
-     * @throws SQLException if the PreparedStatement throws a SQLException 
when calling the appropriate setter
-     */
-    private void setParameter(final PreparedStatement stmt, final String 
attrName, final int parameterIndex, final String parameterValue, final int 
jdbcType,
-                              final String valueFormat)
-            throws SQLException, ParseException, UnsupportedEncodingException {
-        if (parameterValue == null) {
-            stmt.setNull(parameterIndex, jdbcType);
-        } else {
-            switch (jdbcType) {
-                case Types.BIT:
-                    stmt.setBoolean(parameterIndex, "1".equals(parameterValue) 
|| "t".equalsIgnoreCase(parameterValue) || 
Boolean.parseBoolean(parameterValue));
-                     break;
-                case Types.BOOLEAN:
-                    stmt.setBoolean(parameterIndex, 
Boolean.parseBoolean(parameterValue));
-                    break;
-                case Types.TINYINT:
-                    stmt.setByte(parameterIndex, 
Byte.parseByte(parameterValue));
-                    break;
-                case Types.SMALLINT:
-                    stmt.setShort(parameterIndex, 
Short.parseShort(parameterValue));
-                    break;
-                case Types.INTEGER:
-                    stmt.setInt(parameterIndex, 
Integer.parseInt(parameterValue));
-                    break;
-                case Types.BIGINT:
-                    stmt.setLong(parameterIndex, 
Long.parseLong(parameterValue));
-                    break;
-                case Types.REAL:
-                    stmt.setFloat(parameterIndex, 
Float.parseFloat(parameterValue));
-                    break;
-                case Types.FLOAT:
-                case Types.DOUBLE:
-                    stmt.setDouble(parameterIndex, 
Double.parseDouble(parameterValue));
-                    break;
-                case Types.DECIMAL:
-                case Types.NUMERIC:
-                    stmt.setBigDecimal(parameterIndex, new 
BigDecimal(parameterValue));
-                    break;
-                case Types.DATE:
-                    Date date;
-
-                    if (valueFormat.equals("")) {
-                        if(LONG_PATTERN.matcher(parameterValue).matches()){
-                            date = new Date(Long.parseLong(parameterValue));
-                        }else {
-                            String dateFormatString = "yyyy-MM-dd";
-                            SimpleDateFormat dateFormat = new 
SimpleDateFormat(dateFormatString);
-                            java.util.Date parsedDate = 
dateFormat.parse(parameterValue);
-                            date = new Date(parsedDate.getTime());
-                        }
-                    } else {
-                        final DateTimeFormatter dtFormatter = 
getDateTimeFormatter(valueFormat);
-                        LocalDate parsedDate = LocalDate.parse(parameterValue, 
dtFormatter);
-                        date = new 
Date(Date.from(parsedDate.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant()).getTime());
-                    }
-
-                    stmt.setDate(parameterIndex, date);
-                    break;
-                case Types.TIME:
-                    Time time;
-
-                    if (valueFormat.equals("")) {
-                        if (LONG_PATTERN.matcher(parameterValue).matches()) {
-                            time = new Time(Long.parseLong(parameterValue));
-                        } else {
-                            String timeFormatString = "HH:mm:ss.SSS";
-                            SimpleDateFormat dateFormat = new 
SimpleDateFormat(timeFormatString);
-                            java.util.Date parsedDate = 
dateFormat.parse(parameterValue);
-                            time = new Time(parsedDate.getTime());
-                        }
-                    } else {
-                        final DateTimeFormatter dtFormatter = 
getDateTimeFormatter(valueFormat);
-                        LocalTime parsedTime = LocalTime.parse(parameterValue, 
dtFormatter);
-                        LocalDateTime localDateTime = 
parsedTime.atDate(LocalDate.ofEpochDay(0));
-                        Instant instant = 
localDateTime.atZone(ZoneId.systemDefault()).toInstant();
-                        time = new Time(instant.toEpochMilli());
-                    }
-
-                    stmt.setTime(parameterIndex, time);
-                    break;
-                case Types.TIMESTAMP:
-                    long lTimestamp=0L;
-
-                    // Backwards compatibility note: Format was unsupported 
for a timestamp field.
-                    if (valueFormat.equals("")) {
-                        if(LONG_PATTERN.matcher(parameterValue).matches()){
-                            lTimestamp = Long.parseLong(parameterValue);
-                        } else {
-                            final SimpleDateFormat dateFormat  = new 
SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
-                            java.util.Date parsedDate = 
dateFormat.parse(parameterValue);
-                            lTimestamp = parsedDate.getTime();
-                        }
-                    } else {
-                        final DateTimeFormatter dtFormatter = 
getDateTimeFormatter(valueFormat);
-                        TemporalAccessor accessor = 
dtFormatter.parse(parameterValue);
-                        java.util.Date parsedDate = 
java.util.Date.from(Instant.from(accessor));
-                        lTimestamp = parsedDate.getTime();
-                    }
-
-                    stmt.setTimestamp(parameterIndex, new 
Timestamp(lTimestamp));
-
-                    break;
-                case Types.BINARY:
-                case Types.VARBINARY:
-                case Types.LONGVARBINARY:
-                    byte[] bValue;
-
-                    switch(valueFormat){
-                        case "":
-                        case "ascii":
-                            bValue = parameterValue.getBytes("ASCII");
-                            break;
-                        case "hex":
-                            bValue = 
DatatypeConverter.parseHexBinary(parameterValue);
-                            break;
-                        case "base64":
-                            bValue = 
DatatypeConverter.parseBase64Binary(parameterValue);
-                            break;
-                        default:
-                            throw new ParseException("Unable to parse binary 
data using the formatter `" + valueFormat + "`.",0);
-                    }
 
-                    stmt.setBinaryStream(parameterIndex, new 
ByteArrayInputStream(bValue), bValue.length);
-
-                    break;
-                case Types.CHAR:
-                case Types.VARCHAR:
-                case Types.LONGNVARCHAR:
-                case Types.LONGVARCHAR:
-                    stmt.setString(parameterIndex, parameterValue);
-                    break;
-                case Types.CLOB:
-                    try (final StringReader reader = new 
StringReader(parameterValue)) {
-                        stmt.setCharacterStream(parameterIndex, reader);
-                    }
-                    break;
-                case Types.NCLOB:
-                    try (final StringReader reader = new 
StringReader(parameterValue)) {
-                        stmt.setNCharacterStream(parameterIndex, reader);
-                    }
-                    break;
-                default:
-                    stmt.setObject(parameterIndex, parameterValue, jdbcType);
-                    break;
-            }
-        }
-    }
-
-    private DateTimeFormatter getDateTimeFormatter(String pattern) {
-        switch(pattern) {
-            case "BASIC_ISO_DATE": return DateTimeFormatter.BASIC_ISO_DATE;
-            case "ISO_LOCAL_DATE": return DateTimeFormatter.ISO_LOCAL_DATE;
-            case "ISO_OFFSET_DATE": return DateTimeFormatter.ISO_OFFSET_DATE;
-            case "ISO_DATE": return DateTimeFormatter.ISO_DATE;
-            case "ISO_LOCAL_TIME": return DateTimeFormatter.ISO_LOCAL_TIME;
-            case "ISO_OFFSET_TIME": return DateTimeFormatter.ISO_OFFSET_TIME;
-            case "ISO_TIME": return DateTimeFormatter.ISO_TIME;
-            case "ISO_LOCAL_DATE_TIME": return 
DateTimeFormatter.ISO_LOCAL_DATE_TIME;
-            case "ISO_OFFSET_DATE_TIME": return 
DateTimeFormatter.ISO_OFFSET_DATE_TIME;
-            case "ISO_ZONED_DATE_TIME": return 
DateTimeFormatter.ISO_ZONED_DATE_TIME;
-            case "ISO_DATE_TIME": return DateTimeFormatter.ISO_DATE_TIME;
-            case "ISO_ORDINAL_DATE": return DateTimeFormatter.ISO_ORDINAL_DATE;
-            case "ISO_WEEK_DATE": return DateTimeFormatter.ISO_WEEK_DATE;
-            case "ISO_INSTANT": return DateTimeFormatter.ISO_INSTANT;
-            case "RFC_1123_DATE_TIME": return 
DateTimeFormatter.RFC_1123_DATE_TIME;
-            default: return DateTimeFormatter.ofPattern(pattern);
-        }
-    }
 
     /**
      * A FlowFileFilter that is responsible for ensuring that the FlowFiles 
returned either belong
@@ -1038,7 +797,7 @@ public class PutSQL extends 
AbstractSessionFactoryProcessor {
             if (selectedId.equals(fragmentId)) {
                 // fragment id's match. Find out if we have all of the 
necessary fragments or not.
                 final int numFragments;
-                if (fragCount != null && 
NUMBER_PATTERN.matcher(fragCount).matches()) {
+                if (fragCount != null && 
JdbcCommon.NUMBER_PATTERN.matcher(fragCount).matches()) {
                     numFragments = Integer.parseInt(fragCount);
                 } else {
                     numFragments = Integer.MAX_VALUE;

http://git-wip-us.apache.org/repos/asf/nifi/blob/b5ca7adb/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
index f8e88ef..1cee441 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
@@ -45,10 +45,13 @@ import static java.sql.Types.TINYINT;
 import static java.sql.Types.VARBINARY;
 import static java.sql.Types.VARCHAR;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Reader;
+import java.io.StringReader;
+import java.io.UnsupportedEncodingException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
@@ -56,11 +59,28 @@ import java.nio.CharBuffer;
 import java.sql.Blob;
 import java.sql.Clob;
 import java.sql.NClob;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
+import java.sql.SQLDataException;
 import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.TemporalAccessor;
 import java.util.Date;
+import java.util.Map;
 import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
@@ -79,6 +99,8 @@ import org.apache.nifi.avro.AvroTypeUtil;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.processor.util.StandardValidators;
 
+import javax.xml.bind.DatatypeConverter;
+
 /**
  * JDBC / SQL common functions.
  */
@@ -90,6 +112,10 @@ public class JdbcCommon {
     private static final int DEFAULT_PRECISION_VALUE = 10;
     private static final int DEFAULT_SCALE_VALUE = 0;
 
+    public static final Pattern LONG_PATTERN = 
Pattern.compile("^-?\\d{1,19}$");
+    public static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = 
Pattern.compile("sql\\.args\\.(\\d+)\\.type");
+    public static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+");
+
     public static final String MIME_TYPE_AVRO_BINARY = 
"application/avro-binary";
 
     public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new 
PropertyDescriptor.Builder()
@@ -609,6 +635,222 @@ public class JdbcCommon {
     }
 
     /**
+     * Sets all of the appropriate parameters on the given PreparedStatement, 
based on the given FlowFile attributes.
+     *
+     * @param stmt the statement to set the parameters on
+     * @param attributes the attributes from which to derive parameter 
indices, values, and types
+     * @throws SQLException if the PreparedStatement throws a SQLException 
when the appropriate setter is called
+     */
+    public static void setParameters(final PreparedStatement stmt, final 
Map<String, String> attributes) throws SQLException {
+        for (final Map.Entry<String, String> entry : attributes.entrySet()) {
+            final String key = entry.getKey();
+            final Matcher matcher = SQL_TYPE_ATTRIBUTE_PATTERN.matcher(key);
+            if (matcher.matches()) {
+                final int parameterIndex = Integer.parseInt(matcher.group(1));
+
+                final boolean isNumeric = 
NUMBER_PATTERN.matcher(entry.getValue()).matches();
+                if (!isNumeric) {
+                    throw new SQLDataException("Value of the " + key + " 
attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral 
type");
+                }
+
+                final int jdbcType = Integer.parseInt(entry.getValue());
+                final String valueAttrName = "sql.args." + parameterIndex + 
".value";
+                final String parameterValue = attributes.get(valueAttrName);
+                final String formatAttrName = "sql.args." + parameterIndex + 
".format";
+                final String parameterFormat = 
attributes.containsKey(formatAttrName)? attributes.get(formatAttrName):"";
+
+                try {
+                    JdbcCommon.setParameter(stmt, valueAttrName, 
parameterIndex, parameterValue, jdbcType, parameterFormat);
+                } catch (final NumberFormatException nfe) {
+                    throw new SQLDataException("The value of the " + 
valueAttrName + " is '" + parameterValue + "', which cannot be converted into 
the necessary data type", nfe);
+                } catch (ParseException pe) {
+                    throw new SQLDataException("The value of the " + 
valueAttrName + " is '" + parameterValue + "', which cannot be converted to a 
timestamp", pe);
+                } catch (UnsupportedEncodingException uee) {
+                    throw new SQLDataException("The value of the " + 
valueAttrName + " is '" + parameterValue + "', which cannot be converted to 
UTF-8", uee);
+                }
+            }
+        }
+    }
+
+    /**
+     * Determines how to map the given value to the appropriate JDBC data type 
and sets the parameter on the
+     * provided PreparedStatement
+     *
+     * @param stmt the PreparedStatement to set the parameter on
+     * @param attrName the name of the attribute that the parameter is coming 
from - for logging purposes
+     * @param parameterIndex the index of the SQL parameter to set
+     * @param parameterValue the value of the SQL parameter to set
+     * @param jdbcType the JDBC Type of the SQL parameter to set
+     * @throws SQLException if the PreparedStatement throws a SQLException 
when calling the appropriate setter
+     */
+    public static void setParameter(final PreparedStatement stmt, final String 
attrName, final int parameterIndex, final String parameterValue, final int 
jdbcType,
+                              final String valueFormat)
+            throws SQLException, ParseException, UnsupportedEncodingException {
+        if (parameterValue == null) {
+            stmt.setNull(parameterIndex, jdbcType);
+        } else {
+            switch (jdbcType) {
+                case Types.BIT:
+                    stmt.setBoolean(parameterIndex, "1".equals(parameterValue) 
|| "t".equalsIgnoreCase(parameterValue) || 
Boolean.parseBoolean(parameterValue));
+                    break;
+                case Types.BOOLEAN:
+                    stmt.setBoolean(parameterIndex, 
Boolean.parseBoolean(parameterValue));
+                    break;
+                case Types.TINYINT:
+                    stmt.setByte(parameterIndex, 
Byte.parseByte(parameterValue));
+                    break;
+                case Types.SMALLINT:
+                    stmt.setShort(parameterIndex, 
Short.parseShort(parameterValue));
+                    break;
+                case Types.INTEGER:
+                    stmt.setInt(parameterIndex, 
Integer.parseInt(parameterValue));
+                    break;
+                case Types.BIGINT:
+                    stmt.setLong(parameterIndex, 
Long.parseLong(parameterValue));
+                    break;
+                case Types.REAL:
+                    stmt.setFloat(parameterIndex, 
Float.parseFloat(parameterValue));
+                    break;
+                case Types.FLOAT:
+                case Types.DOUBLE:
+                    stmt.setDouble(parameterIndex, 
Double.parseDouble(parameterValue));
+                    break;
+                case Types.DECIMAL:
+                case Types.NUMERIC:
+                    stmt.setBigDecimal(parameterIndex, new 
BigDecimal(parameterValue));
+                    break;
+                case Types.DATE:
+                    java.sql.Date date;
+
+                    if (valueFormat.equals("")) {
+                        if(LONG_PATTERN.matcher(parameterValue).matches()){
+                            date = new 
java.sql.Date(Long.parseLong(parameterValue));
+                        }else {
+                            String dateFormatString = "yyyy-MM-dd";
+                            SimpleDateFormat dateFormat = new 
SimpleDateFormat(dateFormatString);
+                            java.util.Date parsedDate = 
dateFormat.parse(parameterValue);
+                            date = new java.sql.Date(parsedDate.getTime());
+                        }
+                    } else {
+                        final DateTimeFormatter dtFormatter = 
getDateTimeFormatter(valueFormat);
+                        LocalDate parsedDate = LocalDate.parse(parameterValue, 
dtFormatter);
+                        date = new 
java.sql.Date(java.sql.Date.from(parsedDate.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant()).getTime());
+                    }
+
+                    stmt.setDate(parameterIndex, date);
+                    break;
+                case Types.TIME:
+                    Time time;
+
+                    if (valueFormat.equals("")) {
+                        if (LONG_PATTERN.matcher(parameterValue).matches()) {
+                            time = new Time(Long.parseLong(parameterValue));
+                        } else {
+                            String timeFormatString = "HH:mm:ss.SSS";
+                            SimpleDateFormat dateFormat = new 
SimpleDateFormat(timeFormatString);
+                            java.util.Date parsedDate = 
dateFormat.parse(parameterValue);
+                            time = new Time(parsedDate.getTime());
+                        }
+                    } else {
+                        final DateTimeFormatter dtFormatter = 
getDateTimeFormatter(valueFormat);
+                        LocalTime parsedTime = LocalTime.parse(parameterValue, 
dtFormatter);
+                        LocalDateTime localDateTime = 
parsedTime.atDate(LocalDate.ofEpochDay(0));
+                        Instant instant = 
localDateTime.atZone(ZoneId.systemDefault()).toInstant();
+                        time = new Time(instant.toEpochMilli());
+                    }
+
+                    stmt.setTime(parameterIndex, time);
+                    break;
+                case Types.TIMESTAMP:
+                    long lTimestamp=0L;
+
+                    // Backwards compatibility note: Format was unsupported 
for a timestamp field.
+                    if (valueFormat.equals("")) {
+                        if(LONG_PATTERN.matcher(parameterValue).matches()){
+                            lTimestamp = Long.parseLong(parameterValue);
+                        } else {
+                            final SimpleDateFormat dateFormat  = new 
SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+                            java.util.Date parsedDate = 
dateFormat.parse(parameterValue);
+                            lTimestamp = parsedDate.getTime();
+                        }
+                    } else {
+                        final DateTimeFormatter dtFormatter = 
getDateTimeFormatter(valueFormat);
+                        TemporalAccessor accessor = 
dtFormatter.parse(parameterValue);
+                        java.util.Date parsedDate = 
java.util.Date.from(Instant.from(accessor));
+                        lTimestamp = parsedDate.getTime();
+                    }
+
+                    stmt.setTimestamp(parameterIndex, new 
Timestamp(lTimestamp));
+
+                    break;
+                case Types.BINARY:
+                case Types.VARBINARY:
+                case Types.LONGVARBINARY:
+                    byte[] bValue;
+
+                    switch(valueFormat){
+                        case "":
+                        case "ascii":
+                            bValue = parameterValue.getBytes("ASCII");
+                            break;
+                        case "hex":
+                            bValue = 
DatatypeConverter.parseHexBinary(parameterValue);
+                            break;
+                        case "base64":
+                            bValue = 
DatatypeConverter.parseBase64Binary(parameterValue);
+                            break;
+                        default:
+                            throw new ParseException("Unable to parse binary 
data using the formatter `" + valueFormat + "`.",0);
+                    }
+
+                    stmt.setBinaryStream(parameterIndex, new 
ByteArrayInputStream(bValue), bValue.length);
+
+                    break;
+                case Types.CHAR:
+                case Types.VARCHAR:
+                case Types.LONGNVARCHAR:
+                case Types.LONGVARCHAR:
+                    stmt.setString(parameterIndex, parameterValue);
+                    break;
+                case Types.CLOB:
+                    try (final StringReader reader = new 
StringReader(parameterValue)) {
+                        stmt.setCharacterStream(parameterIndex, reader);
+                    }
+                    break;
+                case Types.NCLOB:
+                    try (final StringReader reader = new 
StringReader(parameterValue)) {
+                        stmt.setNCharacterStream(parameterIndex, reader);
+                    }
+                    break;
+                default:
+                    stmt.setObject(parameterIndex, parameterValue, jdbcType);
+                    break;
+            }
+        }
+    }
+
+    public static DateTimeFormatter getDateTimeFormatter(String pattern) {
+        switch(pattern) {
+            case "BASIC_ISO_DATE": return DateTimeFormatter.BASIC_ISO_DATE;
+            case "ISO_LOCAL_DATE": return DateTimeFormatter.ISO_LOCAL_DATE;
+            case "ISO_OFFSET_DATE": return DateTimeFormatter.ISO_OFFSET_DATE;
+            case "ISO_DATE": return DateTimeFormatter.ISO_DATE;
+            case "ISO_LOCAL_TIME": return DateTimeFormatter.ISO_LOCAL_TIME;
+            case "ISO_OFFSET_TIME": return DateTimeFormatter.ISO_OFFSET_TIME;
+            case "ISO_TIME": return DateTimeFormatter.ISO_TIME;
+            case "ISO_LOCAL_DATE_TIME": return 
DateTimeFormatter.ISO_LOCAL_DATE_TIME;
+            case "ISO_OFFSET_DATE_TIME": return 
DateTimeFormatter.ISO_OFFSET_DATE_TIME;
+            case "ISO_ZONED_DATE_TIME": return 
DateTimeFormatter.ISO_ZONED_DATE_TIME;
+            case "ISO_DATE_TIME": return DateTimeFormatter.ISO_DATE_TIME;
+            case "ISO_ORDINAL_DATE": return DateTimeFormatter.ISO_ORDINAL_DATE;
+            case "ISO_WEEK_DATE": return DateTimeFormatter.ISO_WEEK_DATE;
+            case "ISO_INSTANT": return DateTimeFormatter.ISO_INSTANT;
+            case "RFC_1123_DATE_TIME": return 
DateTimeFormatter.RFC_1123_DATE_TIME;
+            default: return DateTimeFormatter.ofPattern(pattern);
+        }
+    }
+
+    /**
      * An interface for callback methods which allows processing of a row 
during the convertToAvroStream() processing.
      * <b>IMPORTANT:</b> This method should only work on the row pointed at by 
the current ResultSet reference.
      * Advancing the cursor (e.g.) can cause rows to be skipped during Avro 
transformation.

http://git-wip-us.apache.org/repos/asf/nifi/blob/b5ca7adb/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index 69b7ae5..3a0b773 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -79,6 +79,14 @@ public class TestExecuteSQL {
         + " from persons PER, products PRD, relationships REL"
         + " where PER.ID = 10";
 
+    final static String QUERY_WITHOUT_EL_WITH_PARAMS = "select "
+            + "  PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as 
PersonCode"
+            + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as 
ProductCode"
+            + ", REL.ID as RelId,    REL.NAME as RelName,    REL.CODE as 
RelCode"
+            + ", ROW_NUMBER() OVER () as rownr "
+            + " from persons PER, products PRD, relationships REL"
+            + " where PER.ID < ? AND REL.ID < ?";
+
 
     @BeforeClass
     public static void setupClass() {
@@ -124,23 +132,35 @@ public class TestExecuteSQL {
     @Test
     public void testNoIncomingConnection() throws ClassNotFoundException, 
SQLException, InitializationException, IOException {
         runner.setIncomingConnection(false);
-        invokeOnTrigger(null, QUERY_WITHOUT_EL, false, true);
+        invokeOnTrigger(null, QUERY_WITHOUT_EL, false, null, true);
     }
 
     @Test
     public void testNoTimeLimit() throws InitializationException, 
ClassNotFoundException, SQLException, IOException {
-        invokeOnTrigger(null, QUERY_WITH_EL, true, true);
+        invokeOnTrigger(null, QUERY_WITH_EL, true, null, true);
     }
 
     @Test
     public void testSelectQueryInFlowFile() throws InitializationException, 
ClassNotFoundException, SQLException, IOException {
-        invokeOnTrigger(null, QUERY_WITHOUT_EL, true, false);
+        invokeOnTrigger(null, QUERY_WITHOUT_EL, true, null, false);
+    }
+
+    @Test
+    public void testSelectQueryInFlowFileWithParameters() throws 
InitializationException, ClassNotFoundException, SQLException, IOException {
+        Map<String, String> sqlParams = new HashMap<String, String>() {{
+            put("sql.args.1.type", "4");
+            put("sql.args.1.value", "20");
+            put("sql.args.2.type", "4");
+            put("sql.args.2.value", "5");
+        }};
+
+        invokeOnTrigger(null, QUERY_WITHOUT_EL_WITH_PARAMS, true, sqlParams, 
false);
     }
 
     @Test
     public void testQueryTimeout() throws InitializationException, 
ClassNotFoundException, SQLException, IOException {
         // Does to seem to have any effect when using embedded Derby
-        invokeOnTrigger(1, QUERY_WITH_EL, true, true); // 1 second max time
+        invokeOnTrigger(1, QUERY_WITH_EL, true, null, true); // 1 second max 
time
     }
 
     @Test
@@ -172,7 +192,7 @@ public class TestExecuteSQL {
     }
 
     @Test
-    public void testWithduplicateColumns() throws SQLException {
+    public void testWithDuplicateColumns() throws SQLException {
         // remove previous test database, if any
         final File dbLocation = new File(DB_LOCATION);
         dbLocation.delete();
@@ -228,7 +248,7 @@ public class TestExecuteSQL {
         runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 0);
     }
 
-    public void invokeOnTrigger(final Integer queryTimeout, final String 
query, final boolean incomingFlowFile, final boolean setQueryProperty)
+    public void invokeOnTrigger(final Integer queryTimeout, final String 
query, final boolean incomingFlowFile, final Map<String,String> attrs, final 
boolean setQueryProperty)
         throws InitializationException, ClassNotFoundException, SQLException, 
IOException {
 
         if (queryTimeout != null) {
@@ -250,7 +270,7 @@ public class TestExecuteSQL {
 
         if (incomingFlowFile) {
             // incoming FlowFile content is not used, but attributes are used
-            final Map<String, String> attributes = new HashMap<>();
+            final Map<String, String> attributes = (attrs == null) ? new 
HashMap<>() : attrs;
             attributes.put("person.id", "10");
             if (!setQueryProperty) {
                 runner.enqueue(query.getBytes(), attributes);

Reply via email to