[ 
https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16664276#comment-16664276
 ] 

ASF GitHub Bot commented on NIFI-5642:
--------------------------------------

Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/3051#discussion_r228331847
  
    --- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 ---
    @@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext 
context) {
     
         @Override
         public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
    +        FlowFile inputFlowFile = null;
             FlowFile fileToProcess = null;
    +
    +        Map<String, String> attributes = null;
    +
             if (context.hasIncomingConnection()) {
    -            fileToProcess = session.get();
    +            inputFlowFile = session.get();
     
                 // If we have no FlowFile, and all incoming connections are 
self-loops then we can continue on.
                 // However, if we have no FlowFile and we have connections 
coming from other Processors, then
                 // we know that we should run only if we have a FlowFile.
    -            if (fileToProcess == null && context.hasNonLoopConnection()) {
    +            if (inputFlowFile == null && context.hasNonLoopConnection()) {
                     return;
                 }
    +
    +            attributes = inputFlowFile.getAttributes();
             }
     
             final ComponentLog logger = getLogger();
    -        final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
    -        final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS);
    +        final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue();
    +        final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS);
             final String outputFormat = 
context.getProperty(OUTPUT_FORMAT).getValue();
    -        final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue());
    +        final long maxRowsPerFlowFile = 
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
    +        final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue());
             final StopWatch stopWatch = new StopWatch(true);
     
    -        if (fileToProcess == null) {
    -            fileToProcess = session.create();
    +        if(inputFlowFile != null){
    +            session.transfer(inputFlowFile, REL_ORIGINAL);
             }
     
             try {
                 // The documentation for the driver recommends the session 
remain open the entire time the processor is running
                 // and states that it is thread-safe. This is why 
connectionSession is not in a try-with-resources.
                 final Session connectionSession = cassandraSession.get();
    -            final ResultSetFuture queryFuture = 
connectionSession.executeAsync(selectQuery);
    +            final ResultSet resultSet;
    +
    +            if (queryTimeout > 0) {
    +                resultSet = connectionSession.execute(selectQuery, 
queryTimeout, TimeUnit.MILLISECONDS);
    +            }else{
    +                resultSet = connectionSession.execute(selectQuery);
    +            }
    +
                 final AtomicLong nrOfRows = new AtomicLong(0L);
     
    -            fileToProcess = session.write(fileToProcess, new 
OutputStreamCallback() {
    -                @Override
    -                public void process(final OutputStream out) throws 
IOException {
    -                    try {
    -                        logger.debug("Executing CQL query {}", new 
Object[]{selectQuery});
    -                        final ResultSet resultSet;
    -                        if (queryTimeout > 0) {
    -                            resultSet = 
queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS);
    -                            if (AVRO_FORMAT.equals(outputFormat)) {
    -                                
nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, 
TimeUnit.MILLISECONDS));
    -                            } else if (JSON_FORMAT.equals(outputFormat)) {
    -                                
nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, 
TimeUnit.MILLISECONDS));
    -                            }
    -                        } else {
    -                            resultSet = queryFuture.getUninterruptibly();
    -                            if (AVRO_FORMAT.equals(outputFormat)) {
    -                                
nrOfRows.set(convertToAvroStream(resultSet, out, 0, null));
    -                            } else if (JSON_FORMAT.equals(outputFormat)) {
    -                                
nrOfRows.set(convertToJsonStream(resultSet, out, charset, 0, null));
    +            do {
    +                fileToProcess = session.create();
    +
    +                // Assuring that if we have an input FlowFile
    +                // the generated output inherit the attributes
    +                if(attributes != null){
    +                    fileToProcess = 
session.putAllAttributes(fileToProcess, attributes);
    +                }
    +
    +                fileToProcess = session.write(fileToProcess, new 
OutputStreamCallback() {
    +                    @Override
    +                    public void process(final OutputStream out) throws 
IOException {
    +                        try {
    +                            logger.debug("Executing CQL query {}", new 
Object[]{selectQuery});
    +                            if (queryTimeout > 0) {
    +                                if (AVRO_FORMAT.equals(outputFormat)) {
    +                                    
nrOfRows.set(convertToAvroStream(resultSet, maxRowsPerFlowFile,
    +                                            out, queryTimeout, 
TimeUnit.MILLISECONDS));
    +                                } else if 
(JSON_FORMAT.equals(outputFormat)) {
    +                                    
nrOfRows.set(convertToJsonStream(resultSet, maxRowsPerFlowFile,
    +                                            out, charset, queryTimeout, 
TimeUnit.MILLISECONDS));
    +                                }
    +                            } else {
    +                                if (AVRO_FORMAT.equals(outputFormat)) {
    +                                    
nrOfRows.set(convertToAvroStream(resultSet, maxRowsPerFlowFile,
    +                                            out, 0, null));
    +                                } else if 
(JSON_FORMAT.equals(outputFormat)) {
    +                                    
nrOfRows.set(convertToJsonStream(resultSet, maxRowsPerFlowFile,
    +                                            out, charset, 0, null));
    +                                }
                                 }
    -                        }
     
    -                    } catch (final TimeoutException | InterruptedException 
| ExecutionException e) {
    -                        throw new ProcessException(e);
    +                        } catch (final TimeoutException | 
InterruptedException | ExecutionException e) {
    +                            throw new ProcessException(e);
    +                        }
                         }
    -                }
    -            });
    +                });
    +
     
    -            // set attribute how many rows were selected
    -            fileToProcess = session.putAttribute(fileToProcess, 
RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
    +                // set attribute how many rows were selected
    +                fileToProcess = session.putAttribute(fileToProcess, 
RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
     
    -            // set mime.type based on output format
    -            fileToProcess = session.putAttribute(fileToProcess, 
CoreAttributes.MIME_TYPE.key(),
    -                    JSON_FORMAT.equals(outputFormat) ? "application/json" 
: "application/avro-binary");
    +                // set mime.type based on output format
    +                fileToProcess = session.putAttribute(fileToProcess, 
CoreAttributes.MIME_TYPE.key(),
    +                        JSON_FORMAT.equals(outputFormat) ? 
"application/json" : "application/avro-binary");
     
    -            logger.info("{} contains {} Avro records; transferring to 
'success'",
    -                    new Object[]{fileToProcess, nrOfRows.get()});
    -            session.getProvenanceReporter().modifyContent(fileToProcess, 
"Retrieved " + nrOfRows.get() + " rows",
    -                    stopWatch.getElapsed(TimeUnit.MILLISECONDS));
    -            session.transfer(fileToProcess, REL_SUCCESS);
    +                logger.info("{} contains {} records; transferring to 
'success'",
    +                        new Object[]{fileToProcess, nrOfRows.get()});
    +                
session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " + 
nrOfRows.get() + " rows",
    +                        stopWatch.getElapsed(TimeUnit.MILLISECONDS));
    +                session.transfer(fileToProcess, REL_SUCCESS);
    +                session.commit();
    --- End diff --
    
    This is kind of a mashup of two features, Max Rows Per FlowFile and Output 
Batch Size (at least in terms of the SQL version of these features). The former 
is to limit the number of rows in each flow file, often used (without Output 
Batch Size) if some processor downstream needs to read the entire content into 
memory. The latter (Output Batch Size) is the number of flow files to commit, 
and is used to "get things moving downstream" by actually sending the flow 
files to the outgoing connection(s), rather than just marking them for transfer 
and actually sending them only when the entire result set has been processed 
(no matter how many rows per flow file).
    
    My preference is to keep these features separate, but I'm happy to discuss 
if you don't see the need for having the choice between either, both, or none. 
I just think it makes it more flexible for the user.


> QueryCassandra processor : output FlowFiles as soon fetch_size is reached
> -------------------------------------------------------------------------
>
>                 Key: NIFI-5642
>                 URL: https://issues.apache.org/jira/browse/NIFI-5642
>             Project: Apache NiFi
>          Issue Type: Bug
>    Affects Versions: 1.7.1
>            Reporter: André Gomes Lamas Otero
>            Priority: Major
>
> When I'm using QueryCassandra alongside with fetch_size parameter I expected 
> that as soon my reader reaches the fetch_size the processor outputs some data 
> to be processed by the next processor, but QueryCassandra reads all the data, 
> then output the flow files.
> I'll start to work on a patch for this situation, I'll appreciate any 
> suggestion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to