Repository: nifi
Updated Branches:
  refs/heads/master fd00df3d2 -> 4edafad6e


NIFI-4473: Add support for large result sets and normalizing Avro names to 
SelectHiveQL

Signed-off-by: Pierre Villard <[email protected]>

This closes #2212.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4edafad6
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4edafad6
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4edafad6

Branch: refs/heads/master
Commit: 4edafad6e5dc268136e191cdb7e8056af95e14cc
Parents: fd00df3
Author: Matthew Burgess <[email protected]>
Authored: Mon Oct 16 13:38:56 2017 -0400
Committer: Pierre Villard <[email protected]>
Committed: Wed Oct 18 13:37:31 2017 +0200

----------------------------------------------------------------------
 .../nifi/processors/hive/SelectHiveQL.java      | 248 +++++++++++++------
 .../apache/nifi/util/hive/CsvOutputOptions.java |   9 +-
 .../apache/nifi/util/hive/HiveJdbcCommon.java   |  51 +++-
 .../nifi/processors/hive/TestSelectHiveQL.java  | 201 ++++++++++++++-
 4 files changed, 416 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/4edafad6/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
index e61fa9f..fb05914 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
@@ -16,9 +16,6 @@
  */
 package org.apache.nifi.processors.hive;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.nio.charset.Charset;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -30,6 +27,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -53,14 +51,18 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processor.util.pattern.PartialFunctions;
 import org.apache.nifi.util.StopWatch;
 import org.apache.nifi.util.hive.CsvOutputOptions;
 import org.apache.nifi.util.hive.HiveJdbcCommon;
 
+import static org.apache.nifi.util.hive.HiveJdbcCommon.AVRO;
+import static org.apache.nifi.util.hive.HiveJdbcCommon.CSV;
+import static org.apache.nifi.util.hive.HiveJdbcCommon.CSV_MIME_TYPE;
+import static org.apache.nifi.util.hive.HiveJdbcCommon.MIME_TYPE_AVRO_BINARY;
+import static 
org.apache.nifi.util.hive.HiveJdbcCommon.NORMALIZE_NAMES_FOR_AVRO;
+
 @EventDriven
 @InputRequirement(Requirement.INPUT_ALLOWED)
 @Tags({"hive", "sql", "select", "jdbc", "query", "database"})
@@ -72,19 +74,21 @@ import org.apache.nifi.util.hive.HiveJdbcCommon;
 @WritesAttributes({
         @WritesAttribute(attribute = "mime.type", description = "Sets the MIME 
type for the outgoing flowfile to application/avro-binary for Avro or text/csv 
for CSV."),
         @WritesAttribute(attribute = "filename", description = "Adds .avro or 
.csv to the filename attribute depending on which output format is selected."),
-        @WritesAttribute(attribute = "selecthiveql.row.count", description = 
"Indicates how many rows were selected/returned by the query.")
+        @WritesAttribute(attribute = "selecthiveql.row.count", description = 
"Indicates how many rows were selected/returned by the query."),
+        @WritesAttribute(attribute = "fragment.identifier", description = "If 
'Max Rows Per Flow File' is set then all FlowFiles from the same query result 
set "
+                + "will have the same value for the fragment.identifier 
attribute. This can then be used to correlate the results."),
+        @WritesAttribute(attribute = "fragment.count", description = "If 'Max 
Rows Per Flow File' is set then this is the total number of  "
+                + "FlowFiles produced by a single ResultSet. This can be used 
in conjunction with the "
+                + "fragment.identifier attribute in order to know how many 
FlowFiles belonged to the same incoming ResultSet."),
+        @WritesAttribute(attribute = "fragment.index", description = "If 'Max 
Rows Per Flow File' is set then the position of this FlowFile in the list of "
+                + "outgoing FlowFiles that were all derived from the same 
result set FlowFile. This can be "
+                + "used in conjunction with the fragment.identifier attribute 
to know which FlowFiles originated from the same query result set and in what 
order  "
+                + "FlowFiles were produced")
 })
 public class SelectHiveQL extends AbstractHiveQLProcessor {
 
     public static final String RESULT_ROW_COUNT = "selecthiveql.row.count";
 
-    protected static final String AVRO = "Avro";
-    protected static final String CSV = "CSV";
-
-    public static final String AVRO_MIME_TYPE = "application/avro-binary";
-    public static final String CSV_MIME_TYPE = "text/csv";
-
-
     // Relationships
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
@@ -99,12 +103,45 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
     public static final PropertyDescriptor HIVEQL_SELECT_QUERY = new 
PropertyDescriptor.Builder()
             .name("hive-query")
             .displayName("HiveQL Select Query")
-            .description("HiveQL SELECT query to execute")
+            .description("HiveQL SELECT query to execute. If this is not set, 
the query is assumed to be in the content of an incoming FlowFile.")
             .required(false)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .expressionLanguageSupported(true)
             .build();
 
+    public static final PropertyDescriptor FETCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("hive-fetch-size")
+            .displayName("Fetch Size")
+            .description("The number of result rows to be fetched from the 
result set at a time. This is a hint to the driver and may not be "
+                    + "honored and/or exact. If the value specified is zero, 
then the hint is ignored.")
+            .defaultValue("0")
+            .required(true)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new 
PropertyDescriptor.Builder()
+            .name("hive-max-rows")
+            .displayName("Max Rows Per Flow File")
+            .description("The maximum number of result rows that will be 
included in a single FlowFile. " +
+                    "This will allow you to break up very large result sets 
into multiple FlowFiles. If the value specified is zero, then all rows are 
returned in a single FlowFile.")
+            .defaultValue("0")
+            .required(true)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor MAX_FRAGMENTS = new 
PropertyDescriptor.Builder()
+            .name("hive-max-frags")
+            .displayName("Maximum Number of Fragments")
+            .description("The maximum number of fragments. If the value 
specified is zero, then all fragments are returned. " +
+                    "This prevents OutOfMemoryError when this processor 
ingests huge table.")
+            .defaultValue("0")
+            .required(true)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
     public static final PropertyDescriptor HIVEQL_CSV_HEADER = new 
PropertyDescriptor.Builder()
             .name("csv-header")
             .displayName("CSV Header")
@@ -174,7 +211,11 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
         List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
         _propertyDescriptors.add(HIVE_DBCP_SERVICE);
         _propertyDescriptors.add(HIVEQL_SELECT_QUERY);
+        _propertyDescriptors.add(FETCH_SIZE);
+        _propertyDescriptors.add(MAX_ROWS_PER_FLOW_FILE);
+        _propertyDescriptors.add(MAX_FRAGMENTS);
         _propertyDescriptors.add(HIVEQL_OUTPUT_FORMAT);
+        _propertyDescriptors.add(NORMALIZE_NAMES_FOR_AVRO);
         _propertyDescriptors.add(HIVEQL_CSV_HEADER);
         _propertyDescriptors.add(HIVEQL_CSV_ALT_HEADER);
         _propertyDescriptors.add(HIVEQL_CSV_DELIMITER);
@@ -216,7 +257,7 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
     }
 
     private void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        final FlowFile fileToProcess = (context.hasIncomingConnection()? 
session.get():null);
+        final FlowFile fileToProcess = (context.hasIncomingConnection() ? 
session.get() : null);
         FlowFile flowfile = null;
 
         // If we have no FlowFile, and all incoming connections are self-loops 
then we can continue on.
@@ -243,95 +284,152 @@ public class SelectHiveQL extends 
AbstractHiveQLProcessor {
             // If the query is not set, then an incoming flow file is 
required, and expected to contain a valid SQL select query.
             // If there is no incoming connection, onTrigger will not be 
called as the processor will fail when scheduled.
             final StringBuilder queryContents = new StringBuilder();
-            session.read(fileToProcess, new InputStreamCallback() {
-                @Override
-                public void process(InputStream in) throws IOException {
-                    queryContents.append(IOUtils.toString(in));
-                }
-            });
+            session.read(fileToProcess, in -> 
queryContents.append(IOUtils.toString(in, charset)));
             selectQuery = queryContents.toString();
         }
 
 
+        final Integer fetchSize = 
context.getProperty(FETCH_SIZE).evaluateAttributeExpressions(fileToProcess).asInteger();
+        final Integer maxRowsPerFlowFile = 
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions(fileToProcess).asInteger();
+        final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet()
+                ? 
context.getProperty(MAX_FRAGMENTS).evaluateAttributeExpressions(fileToProcess).asInteger()
+                : 0;
         final String outputFormat = 
context.getProperty(HIVEQL_OUTPUT_FORMAT).getValue();
+        final boolean convertNamesForAvro = 
context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean();
         final StopWatch stopWatch = new StopWatch(true);
         final boolean header = 
context.getProperty(HIVEQL_CSV_HEADER).asBoolean();
         final String altHeader = 
context.getProperty(HIVEQL_CSV_ALT_HEADER).evaluateAttributeExpressions(fileToProcess).getValue();
         final String delimiter = 
context.getProperty(HIVEQL_CSV_DELIMITER).evaluateAttributeExpressions(fileToProcess).getValue();
         final boolean quote = 
context.getProperty(HIVEQL_CSV_QUOTE).asBoolean();
         final boolean escape = 
context.getProperty(HIVEQL_CSV_HEADER).asBoolean();
+        final String fragmentIdentifier = UUID.randomUUID().toString();
 
         try (final Connection con = dbcpService.getConnection();
-             final Statement st = ( flowbased ? 
con.prepareStatement(selectQuery): con.createStatement())
+             final Statement st = (flowbased ? 
con.prepareStatement(selectQuery) : con.createStatement())
         ) {
 
-            final AtomicLong nrOfRows = new AtomicLong(0L);
-            if (fileToProcess == null) {
-                flowfile = session.create();
-            } else {
-                flowfile = fileToProcess;
+            if (fetchSize != null && fetchSize > 0) {
+                try {
+                    st.setFetchSize(fetchSize);
+                } catch (SQLException se) {
+                    // Not all drivers support this, just log the error (at 
debug level) and move on
+                    logger.debug("Cannot set fetch size to {} due to {}", new 
Object[]{fetchSize, se.getLocalizedMessage()}, se);
+                }
             }
 
-            flowfile = session.write(flowfile, new OutputStreamCallback() {
-                @Override
-                public void process(final OutputStream out) throws IOException 
{
-                    try {
-                        logger.debug("Executing query {}", new 
Object[]{selectQuery});
-                        if (flowbased) {
-                            // Hive JDBC Doesn't Support this yet:
-                            // ParameterMetaData pmd = 
((PreparedStatement)st).getParameterMetaData();
-                            // int paramCount = pmd.getParameterCount();
+            final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
+            try {
+                logger.debug("Executing query {}", new Object[]{selectQuery});
+                if (flowbased) {
+                    // Hive JDBC Doesn't Support this yet:
+                    // ParameterMetaData pmd = 
((PreparedStatement)st).getParameterMetaData();
+                    // int paramCount = pmd.getParameterCount();
+
+                    // Alternate way to determine number of params in SQL.
+                    int paramCount = StringUtils.countMatches(selectQuery, 
"?");
+
+                    if (paramCount > 0) {
+                        setParameters(1, (PreparedStatement) st, paramCount, 
flowfile.getAttributes());
+                    }
+                }
 
-                            // Alternate way to determine number of params in 
SQL.
-                            int paramCount = 
StringUtils.countMatches(selectQuery, "?");
+                final ResultSet resultSet;
 
-                            if (paramCount > 0) {
-                                setParameters(1, (PreparedStatement) st, 
paramCount, fileToProcess.getAttributes());
+                try {
+                    resultSet = (flowbased ? ((PreparedStatement) 
st).executeQuery() : st.executeQuery(selectQuery));
+                } catch (SQLException se) {
+                    // If an error occurs during the query, a flowfile is 
expected to be routed to failure, so create one here (the original will be 
removed)
+                    flowfile = session.create(fileToProcess);
+                    throw se;
+                }
+
+                int fragmentIndex = 0;
+                String baseFilename = (fileToProcess != null) ? 
fileToProcess.getAttribute(CoreAttributes.FILENAME.key()) : null;
+                while (true) {
+                    final AtomicLong nrOfRows = new AtomicLong(0L);
+                    flowfile = (flowfile == null) ? session.create() : 
session.create(flowfile);
+                    if (baseFilename == null) {
+                        baseFilename = 
flowfile.getAttribute(CoreAttributes.FILENAME.key());
+                    }
+                    try {
+                        flowfile = session.write(flowfile, out -> {
+                            try {
+                                if (AVRO.equals(outputFormat)) {
+                                    
nrOfRows.set(HiveJdbcCommon.convertToAvroStream(resultSet, out, 
maxRowsPerFlowFile, convertNamesForAvro));
+                                } else if (CSV.equals(outputFormat)) {
+                                    CsvOutputOptions options = new 
CsvOutputOptions(header, altHeader, delimiter, quote, escape, 
maxRowsPerFlowFile);
+                                    
nrOfRows.set(HiveJdbcCommon.convertToCsvStream(resultSet, out, options));
+                                } else {
+                                    nrOfRows.set(0L);
+                                    throw new ProcessException("Unsupported 
output format: " + outputFormat);
+                                }
+                            } catch (final SQLException | RuntimeException e) {
+                                throw new ProcessException("Error during 
database query or conversion of records.", e);
                             }
-                        }
+                        });
+                    } catch (ProcessException e) {
+                        // Add flowfile to results before rethrowing so it 
will be removed from session in outer catch
+                        resultSetFlowFiles.add(flowfile);
+                        throw e;
+                    }
 
-                        final ResultSet resultSet = (flowbased ? 
((PreparedStatement)st).executeQuery(): st.executeQuery(selectQuery));
+                    if (nrOfRows.get() > 0 || resultSetFlowFiles.isEmpty()) {
+                        // Set attribute for how many rows were selected
+                        flowfile = session.putAttribute(flowfile, 
RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
 
+                        // Set MIME type on output document and add extension 
to filename
                         if (AVRO.equals(outputFormat)) {
-                            
nrOfRows.set(HiveJdbcCommon.convertToAvroStream(resultSet, out));
+                            flowfile = session.putAttribute(flowfile, 
CoreAttributes.MIME_TYPE.key(), MIME_TYPE_AVRO_BINARY);
+                            flowfile = session.putAttribute(flowfile, 
CoreAttributes.FILENAME.key(), baseFilename + "." + fragmentIndex + ".avro");
                         } else if (CSV.equals(outputFormat)) {
-                            CsvOutputOptions options = new 
CsvOutputOptions(header, altHeader, delimiter, quote, escape);
-                            
nrOfRows.set(HiveJdbcCommon.convertToCsvStream(resultSet, out,options));
+                            flowfile = session.putAttribute(flowfile, 
CoreAttributes.MIME_TYPE.key(), CSV_MIME_TYPE);
+                            flowfile = session.putAttribute(flowfile, 
CoreAttributes.FILENAME.key(), baseFilename + "." + fragmentIndex + ".csv");
+                        }
+
+                        if (maxRowsPerFlowFile > 0) {
+                            flowfile = session.putAttribute(flowfile, 
"fragment.identifier", fragmentIdentifier);
+                            flowfile = session.putAttribute(flowfile, 
"fragment.index", String.valueOf(fragmentIndex));
+                        }
+
+                        logger.info("{} contains {} Avro records; transferring 
to 'success'",
+                                new Object[]{flowfile, nrOfRows.get()});
+
+                        if (context.hasIncomingConnection()) {
+                            // If the flow file came from an incoming 
connection, issue a Modify Content provenance event
+
+                            
session.getProvenanceReporter().modifyContent(flowfile, "Retrieved " + 
nrOfRows.get() + " rows",
+                                    
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
                         } else {
-                            nrOfRows.set(0L);
-                            throw new ProcessException("Unsupported output 
format: " + outputFormat);
+                            // If we created a flow file from rows received 
from Hive, issue a Receive provenance event
+                            session.getProvenanceReporter().receive(flowfile, 
dbcpService.getConnectionURL(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
                         }
-                    } catch (final SQLException e) {
-                        throw new ProcessException(e);
+                        resultSetFlowFiles.add(flowfile);
+                    } else {
+                        // If there were no rows returned (and the first flow 
file has been sent, we're done processing, so remove the flowfile and carry on
+                        session.remove(flowfile);
+                        break;
                     }
-                }
-            });
-
-            // Set attribute for how many rows were selected
-            flowfile = session.putAttribute(flowfile, RESULT_ROW_COUNT, 
String.valueOf(nrOfRows.get()));
-
-            // Set MIME type on output document and add extension to filename
-            if (AVRO.equals(outputFormat)) {
-                flowfile = session.putAttribute(flowfile, 
CoreAttributes.MIME_TYPE.key(), AVRO_MIME_TYPE);
-                flowfile = session.putAttribute(flowfile, 
CoreAttributes.FILENAME.key(), 
flowfile.getAttribute(CoreAttributes.FILENAME.key()) + ".avro");
-            } else if (CSV.equals(outputFormat)) {
-                flowfile = session.putAttribute(flowfile, 
CoreAttributes.MIME_TYPE.key(), CSV_MIME_TYPE);
-                flowfile = session.putAttribute(flowfile, 
CoreAttributes.FILENAME.key(), 
flowfile.getAttribute(CoreAttributes.FILENAME.key()) + ".csv");
-            }
 
-            logger.info("{} contains {} Avro records; transferring to 
'success'",
-                    new Object[]{flowfile, nrOfRows.get()});
+                    fragmentIndex++;
+                    if (maxFragments > 0 && fragmentIndex >= maxFragments) {
+                        break;
+                    }
+                }
 
-            if (context.hasIncomingConnection()) {
-                // If the flow file came from an incoming connection, issue a 
Modify Content provenance event
+                for (int i = 0; i < resultSetFlowFiles.size(); i++) {
+                    // Set count on all FlowFiles
+                    if (maxRowsPerFlowFile > 0) {
+                        resultSetFlowFiles.set(i,
+                                
session.putAttribute(resultSetFlowFiles.get(i), "fragment.count", 
Integer.toString(fragmentIndex)));
+                    }
+                }
 
-                session.getProvenanceReporter().modifyContent(flowfile, 
"Retrieved " + nrOfRows.get() + " rows",
-                        stopWatch.getElapsed(TimeUnit.MILLISECONDS));
-            } else {
-                // If we created a flow file from rows received from Hive, 
issue a Receive provenance event
-                session.getProvenanceReporter().receive(flowfile, 
dbcpService.getConnectionURL(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            } catch (final SQLException e) {
+                throw e;
             }
-            session.transfer(flowfile, REL_SUCCESS);
+
+            session.transfer(resultSetFlowFiles, REL_SUCCESS);
+
         } catch (final ProcessException | SQLException e) {
             logger.error("Issue processing SQL {} due to {}.", new 
Object[]{selectQuery, e});
             if (flowfile == null) {
@@ -352,7 +450,9 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
                 session.transfer(flowfile, REL_FAILURE);
             }
         } finally {
-
+            if (fileToProcess != null) {
+                session.remove(fileToProcess);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/4edafad6/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/CsvOutputOptions.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/CsvOutputOptions.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/CsvOutputOptions.java
index bad6926..3688912 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/CsvOutputOptions.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/CsvOutputOptions.java
@@ -24,6 +24,8 @@ public class CsvOutputOptions {
     private boolean quote = false;
     private boolean escape = true;
 
+    private int maxRowsPerFlowFile = 0;
+
     public boolean isHeader() {
         return header;
     }
@@ -46,11 +48,16 @@ public class CsvOutputOptions {
         return escape;
     }
 
-    public CsvOutputOptions(boolean header, String altHeader, String 
delimiter, boolean quote, boolean escape) {
+    public int getMaxRowsPerFlowFile() {
+        return maxRowsPerFlowFile;
+    }
+
+    public CsvOutputOptions(boolean header, String altHeader, String 
delimiter, boolean quote, boolean escape, int maxRowsPerFlowFile) {
         this.header = header;
         this.altHeader = altHeader;
         this.delimiter = delimiter;
         this.quote = quote;
         this.escape = escape;
+        this.maxRowsPerFlowFile = maxRowsPerFlowFile;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/4edafad6/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java
index 83d4e22..ff06495 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java
@@ -29,6 +29,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.nifi.components.PropertyDescriptor;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -78,14 +79,31 @@ import static java.sql.Types.VARCHAR;
  */
 public class HiveJdbcCommon {
 
-    public static long convertToAvroStream(final ResultSet rs, final 
OutputStream outStream) throws SQLException, IOException {
-        return convertToAvroStream(rs, outStream, null, null);
+    public static final String AVRO = "Avro";
+    public static final String CSV = "CSV";
+
+    public static final String MIME_TYPE_AVRO_BINARY = 
"application/avro-binary";
+    public static final String CSV_MIME_TYPE = "text/csv";
+
+
+    public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new 
PropertyDescriptor.Builder()
+            .name("hive-normalize-avro")
+            .displayName("Normalize Table/Column Names")
+            .description("Whether to change non-Avro-compatible characters in 
column names to Avro-compatible characters. For example, colons and periods "
+                    + "will be changed to underscores in order to build a 
valid Avro record.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
+    public static long convertToAvroStream(final ResultSet rs, final 
OutputStream outStream, final int maxRows, boolean convertNames) throws 
SQLException, IOException {
+        return convertToAvroStream(rs, outStream, null, maxRows, convertNames, 
null);
     }
 
 
-    public static long convertToAvroStream(final ResultSet rs, final 
OutputStream outStream, String recordName, ResultSetRowCallback callback)
+    public static long convertToAvroStream(final ResultSet rs, final 
OutputStream outStream, String recordName, final int maxRows, boolean 
convertNames, ResultSetRowCallback callback)
             throws SQLException, IOException {
-        final Schema schema = createSchema(rs, recordName);
+        final Schema schema = createSchema(rs, recordName, convertNames);
         final GenericRecord rec = new GenericData.Record(schema);
 
         final DatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(schema);
@@ -157,14 +175,17 @@ public class HiveJdbcCommon {
                 }
                 dataFileWriter.append(rec);
                 nrOfRows += 1;
+
+                if (maxRows > 0 && nrOfRows == maxRows)
+                    break;
             }
 
             return nrOfRows;
         }
     }
 
-    public static Schema createSchema(final ResultSet rs) throws SQLException {
-        return createSchema(rs, null);
+    public static Schema createSchema(final ResultSet rs, boolean 
convertNames) throws SQLException {
+        return createSchema(rs, null, false);
     }
 
     /**
@@ -173,10 +194,11 @@ public class HiveJdbcCommon {
      *
      * @param rs         The result set to convert to Avro
      * @param recordName The a priori record name to use if it cannot be 
determined from the result set.
+     * @param convertNames  Whether to convert column/table names to be legal 
Avro names
      * @return A Schema object representing the result set converted to an 
Avro record
      * @throws SQLException if any error occurs during conversion
      */
-    public static Schema createSchema(final ResultSet rs, String recordName) 
throws SQLException {
+    public static Schema createSchema(final ResultSet rs, String recordName, 
boolean convertNames) throws SQLException {
         final ResultSetMetaData meta = rs.getMetaData();
         final int nrOfColumns = meta.getColumnCount();
         String tableName = StringUtils.isEmpty(recordName) ? 
"NiFi_SelectHiveQL_Record" : recordName;
@@ -196,6 +218,9 @@ public class HiveJdbcCommon {
             // Not all drivers support getTableName, so just use the 
previously-set default
         }
 
+        if (convertNames) {
+            tableName = normalizeNameForAvro(tableName);
+        }
         final FieldAssembler<Schema> builder = 
SchemaBuilder.record(tableName).namespace("any.data").fields();
 
         /**
@@ -325,6 +350,7 @@ public class HiveJdbcCommon {
         }
 
         // Iterate over the rows
+        int maxRows = outputOptions.getMaxRowsPerFlowFile();
         long nrOfRows = 0;
         while (rs.next()) {
             if (callback != null) {
@@ -388,10 +414,21 @@ public class HiveJdbcCommon {
             outStream.write(StringUtils.join(rowValues, 
outputOptions.getDelimiter()).getBytes(StandardCharsets.UTF_8));
             outStream.write("\n".getBytes(StandardCharsets.UTF_8));
             nrOfRows++;
+
+            if (maxRows > 0 && nrOfRows == maxRows)
+                break;
         }
         return nrOfRows;
     }
 
+    public static String normalizeNameForAvro(String inputName) {
+        String normalizedName = inputName.replaceAll("[^A-Za-z0-9_]", "_");
+        if (Character.isDigit(normalizedName.charAt(0))) {
+            normalizedName = "_" + normalizedName;
+        }
+        return normalizedName;
+    }
+
     /**
      * An interface for callback methods which allows processing of a row 
during the convertToXYZStream() processing.
      * <b>IMPORTANT:</b> This method should only work on the row pointed at by 
the current ResultSet reference.

http://git-wip-us.apache.org/repos/asf/nifi/blob/4edafad6/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java
index 6ce21e9..34384ac 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java
@@ -29,6 +29,7 @@ import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.util.hive.HiveJdbcCommon;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -51,12 +52,17 @@ import java.util.Map;
 import java.util.Random;
 
 import static 
org.apache.nifi.processors.hive.SelectHiveQL.HIVEQL_OUTPUT_FORMAT;
+import static org.apache.nifi.util.hive.HiveJdbcCommon.AVRO;
+import static org.apache.nifi.util.hive.HiveJdbcCommon.CSV;
+import static org.apache.nifi.util.hive.HiveJdbcCommon.CSV_MIME_TYPE;
+import static org.apache.nifi.util.hive.HiveJdbcCommon.MIME_TYPE_AVRO_BINARY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class TestSelectHiveQL {
 
     private static final Logger LOGGER;
+    private final static String MAX_ROWS_KEY = "maxRows";
 
     static {
         System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
@@ -67,14 +73,14 @@ public class TestSelectHiveQL {
         LOGGER = LoggerFactory.getLogger(TestSelectHiveQL.class);
     }
 
-    final static String DB_LOCATION = "target/db";
+    private final static String DB_LOCATION = "target/db";
 
-    final static String QUERY_WITH_EL = "select "
+    private final static String QUERY_WITH_EL = "select "
             + "  PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as 
PersonCode"
             + " from persons PER"
             + " where PER.ID > ${person.id}";
 
-    final static String QUERY_WITHOUT_EL = "select "
+    private final static String QUERY_WITHOUT_EL = "select "
             + "  PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as 
PersonCode"
             + " from persons PER"
             + " where PER.ID > 10";
@@ -132,6 +138,7 @@ public class TestSelectHiveQL {
         try {
             stmt.execute("drop table TEST_NULL_INT");
         } catch (final SQLException sqle) {
+            // Nothing to do, probably means the table didn't exist
         }
 
         stmt.execute("create table TEST_NULL_INT (id integer not null, val1 
integer, val2 integer, constraint my_pk primary key (id))");
@@ -160,6 +167,7 @@ public class TestSelectHiveQL {
         try {
             stmt.execute("drop table TEST_NO_ROWS");
         } catch (final SQLException sqle) {
+            // Nothing to do, probably means the table didn't exist
         }
 
         stmt.execute("create table TEST_NO_ROWS (id integer)");
@@ -176,13 +184,13 @@ public class TestSelectHiveQL {
     @Test
     public void invokeOnTriggerWithCsv()
             throws InitializationException, ClassNotFoundException, 
SQLException, IOException {
-        invokeOnTrigger(QUERY_WITHOUT_EL, false, SelectHiveQL.CSV);
+        invokeOnTrigger(QUERY_WITHOUT_EL, false, CSV);
     }
 
     @Test
     public void invokeOnTriggerWithAvro()
             throws InitializationException, ClassNotFoundException, 
SQLException, IOException {
-        invokeOnTrigger(QUERY_WITHOUT_EL, false, SelectHiveQL.AVRO);
+        invokeOnTrigger(QUERY_WITHOUT_EL, false, AVRO);
     }
 
     public void invokeOnTrigger(final String query, final boolean 
incomingFlowFile, String outputFormat)
@@ -230,8 +238,8 @@ public class TestSelectHiveQL {
         MockFlowFile flowFile = flowfiles.get(0);
         final InputStream in = new 
ByteArrayInputStream(flowFile.toByteArray());
         long recordsFromStream = 0;
-        if (SelectHiveQL.AVRO.equals(outputFormat)) {
-            assertEquals(SelectHiveQL.AVRO_MIME_TYPE, 
flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()));
+        if (AVRO.equals(outputFormat)) {
+            assertEquals(MIME_TYPE_AVRO_BINARY, 
flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()));
             final DatumReader<GenericRecord> datumReader = new 
GenericDatumReader<>();
             try (DataFileStream<GenericRecord> dataFileReader = new 
DataFileStream<>(in, datumReader)) {
                 GenericRecord record = null;
@@ -244,7 +252,7 @@ public class TestSelectHiveQL {
                 }
             }
         } else {
-            assertEquals(SelectHiveQL.CSV_MIME_TYPE, 
flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()));
+            assertEquals(CSV_MIME_TYPE, 
flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()));
             BufferedReader br = new BufferedReader(new InputStreamReader(in));
 
             String headerRow = br.readLine();
@@ -256,7 +264,7 @@ public class TestSelectHiveQL {
             while ((line = br.readLine()) != null) {
                 recordsFromStream++;
                 String[] values = line.split(",");
-                if(recordsFromStream < (nrOfRows - 10)) {
+                if (recordsFromStream < (nrOfRows - 10)) {
                     assertEquals(3, values.length);
                     assertTrue(values[1].startsWith("\""));
                     assertTrue(values[1].endsWith("\""));
@@ -269,6 +277,178 @@ public class TestSelectHiveQL {
         assertEquals(recordsFromStream, 
Integer.parseInt(flowFile.getAttribute(SelectHiveQL.RESULT_ROW_COUNT)));
     }
 
+    @Test
+    public void testMaxRowsPerFlowFileAvro() throws ClassNotFoundException, 
SQLException, InitializationException, IOException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+        InputStream in;
+        MockFlowFile mff;
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since 
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, 
name varchar(100), scale float, created_on timestamp, bignum bigint default 
0)");
+        int rowCount = 0;
+        //create larger row set
+        for (int batch = 0; batch < 100; batch++) {
+            stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 
03:23:34.234')");
+            rowCount++;
+        }
+
+        runner.setIncomingConnection(false);
+        runner.setProperty(SelectHiveQL.HIVEQL_SELECT_QUERY, "SELECT * FROM 
TEST_QUERY_DB_TABLE");
+        runner.setProperty(SelectHiveQL.MAX_ROWS_PER_FLOW_FILE, "${" + 
MAX_ROWS_KEY + "}");
+        runner.setProperty(SelectHiveQL.HIVEQL_OUTPUT_FORMAT, 
HiveJdbcCommon.AVRO);
+        runner.setVariable(MAX_ROWS_KEY, "9");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_SUCCESS, 12);
+
+        //ensure all but the last file have 9 records each
+        for (int ff = 0; ff < 11; ff++) {
+            mff = 
runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS).get(ff);
+            in = new ByteArrayInputStream(mff.toByteArray());
+            assertEquals(9, getNumberOfRecordsFromStream(in));
+
+            mff.assertAttributeExists("fragment.identifier");
+            assertEquals(Integer.toString(ff), 
mff.getAttribute("fragment.index"));
+            assertEquals("12", mff.getAttribute("fragment.count"));
+        }
+
+        //last file should have 1 record
+        mff = 
runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS).get(11);
+        in = new ByteArrayInputStream(mff.toByteArray());
+        assertEquals(1, getNumberOfRecordsFromStream(in));
+        mff.assertAttributeExists("fragment.identifier");
+        assertEquals(Integer.toString(11), mff.getAttribute("fragment.index"));
+        assertEquals("12", mff.getAttribute("fragment.count"));
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testMaxRowsPerFlowFileCSV() throws ClassNotFoundException, 
SQLException, InitializationException, IOException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+        InputStream in;
+        MockFlowFile mff;
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since 
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, 
name varchar(100), scale float, created_on timestamp, bignum bigint default 
0)");
+        int rowCount = 0;
+        //create larger row set
+        for (int batch = 0; batch < 100; batch++) {
+            stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 
03:23:34.234')");
+            rowCount++;
+        }
+
+        runner.setIncomingConnection(true);
+        runner.setProperty(SelectHiveQL.MAX_ROWS_PER_FLOW_FILE, "${" + 
MAX_ROWS_KEY + "}");
+        runner.setProperty(SelectHiveQL.HIVEQL_OUTPUT_FORMAT, 
HiveJdbcCommon.CSV);
+
+        runner.enqueue("SELECT * FROM TEST_QUERY_DB_TABLE", new 
HashMap<String, String>() {{
+            put(MAX_ROWS_KEY, "9");
+        }});
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_SUCCESS, 12);
+
+        //ensure all but the last file have 9 records (10 lines = 9 records + 
header) each
+        for (int ff = 0; ff < 11; ff++) {
+            mff = 
runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS).get(ff);
+            in = new ByteArrayInputStream(mff.toByteArray());
+            BufferedReader br = new BufferedReader(new InputStreamReader(in));
+            assertEquals(10, br.lines().count());
+
+            mff.assertAttributeExists("fragment.identifier");
+            assertEquals(Integer.toString(ff), 
mff.getAttribute("fragment.index"));
+            assertEquals("12", mff.getAttribute("fragment.count"));
+        }
+
+        //last file should have 1 record (2 lines = 1 record + header)
+        mff = 
runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS).get(11);
+        in = new ByteArrayInputStream(mff.toByteArray());
+        BufferedReader br = new BufferedReader(new InputStreamReader(in));
+        assertEquals(2, br.lines().count());
+        mff.assertAttributeExists("fragment.identifier");
+        assertEquals(Integer.toString(11), mff.getAttribute("fragment.index"));
+        assertEquals("12", mff.getAttribute("fragment.count"));
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testMaxRowsPerFlowFileWithMaxFragments() throws 
ClassNotFoundException, SQLException, InitializationException, IOException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+        InputStream in;
+        MockFlowFile mff;
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since 
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, 
name varchar(100), scale float, created_on timestamp, bignum bigint default 
0)");
+        int rowCount = 0;
+        //create larger row set
+        for (int batch = 0; batch < 100; batch++) {
+            stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 
03:23:34.234')");
+            rowCount++;
+        }
+
+        runner.setIncomingConnection(false);
+        runner.setProperty(SelectHiveQL.HIVEQL_SELECT_QUERY, "SELECT * FROM 
TEST_QUERY_DB_TABLE");
+        runner.setProperty(SelectHiveQL.MAX_ROWS_PER_FLOW_FILE, "9");
+        Integer maxFragments = 3;
+        runner.setProperty(SelectHiveQL.MAX_FRAGMENTS, 
maxFragments.toString());
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_SUCCESS, 
maxFragments);
+
+        for (int i = 0; i < maxFragments; i++) {
+            mff = 
runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS).get(i);
+            in = new ByteArrayInputStream(mff.toByteArray());
+            assertEquals(9, getNumberOfRecordsFromStream(in));
+
+            mff.assertAttributeExists("fragment.identifier");
+            assertEquals(Integer.toString(i), 
mff.getAttribute("fragment.index"));
+            assertEquals(maxFragments.toString(), 
mff.getAttribute("fragment.count"));
+        }
+
+        runner.clearTransferState();
+    }
+
+    private long getNumberOfRecordsFromStream(InputStream in) throws 
IOException {
+        final DatumReader<GenericRecord> datumReader = new 
GenericDatumReader<>();
+        try (DataFileStream<GenericRecord> dataFileReader = new 
DataFileStream<>(in, datumReader)) {
+            GenericRecord record = null;
+            long recordsFromStream = 0;
+            while (dataFileReader.hasNext()) {
+                // Reuse record object by passing it to next(). This saves us 
from
+                // allocating and garbage collecting many objects for files 
with
+                // many items.
+                record = dataFileReader.next(record);
+                recordsFromStream += 1;
+            }
+
+            return recordsFromStream;
+        }
+    }
+
     /**
      * Simple implementation only for SelectHiveQL processor testing.
      */
@@ -283,8 +463,7 @@ public class TestSelectHiveQL {
         public Connection getConnection() throws ProcessException {
             try {
                 Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-                final Connection con = 
DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
-                return con;
+                return DriverManager.getConnection("jdbc:derby:" + DB_LOCATION 
+ ";create=true");
             } catch (final Exception e) {
                 throw new ProcessException("getConnection failed: " + e);
             }

Reply via email to