Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/3051#discussion_r237147586
  
    --- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 ---
    @@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext 
context) {
     
         @Override
         public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
    +        FlowFile inputFlowFile = null;
             FlowFile fileToProcess = null;
    +
    +        Map<String, String> attributes = null;
    +
             if (context.hasIncomingConnection()) {
    -            fileToProcess = session.get();
    +            inputFlowFile = session.get();
     
                 // If we have no FlowFile, and all incoming connections are 
self-loops then we can continue on.
                 // However, if we have no FlowFile and we have connections 
coming from other Processors, then
                 // we know that we should run only if we have a FlowFile.
    -            if (fileToProcess == null && context.hasNonLoopConnection()) {
    +            if (inputFlowFile == null && context.hasNonLoopConnection()) {
                     return;
                 }
    +
    +            attributes = inputFlowFile.getAttributes();
             }
     
             final ComponentLog logger = getLogger();
    -        final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
    -        final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS);
    +        final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue();
    +        final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS);
             final String outputFormat = 
context.getProperty(OUTPUT_FORMAT).getValue();
    -        final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue());
    +        final long maxRowsPerFlowFile = 
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
    +        final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue());
             final StopWatch stopWatch = new StopWatch(true);
     
    -        if (fileToProcess == null) {
    -            fileToProcess = session.create();
    +        if(inputFlowFile != null){
    +            session.transfer(inputFlowFile, REL_ORIGINAL);
             }
     
             try {
                 // The documentation for the driver recommends the session 
remain open the entire time the processor is running
                 // and states that it is thread-safe. This is why 
connectionSession is not in a try-with-resources.
                 final Session connectionSession = cassandraSession.get();
    -            final ResultSetFuture queryFuture = 
connectionSession.executeAsync(selectQuery);
    +            final ResultSet resultSet;
    +
    +            if (queryTimeout > 0) {
    +                resultSet = connectionSession.execute(selectQuery, 
queryTimeout, TimeUnit.MILLISECONDS);
    +            }else{
    +                resultSet = connectionSession.execute(selectQuery);
    +            }
    +
                 final AtomicLong nrOfRows = new AtomicLong(0L);
     
    -            fileToProcess = session.write(fileToProcess, new 
OutputStreamCallback() {
    -                @Override
    -                public void process(final OutputStream out) throws 
IOException {
    -                    try {
    -                        logger.debug("Executing CQL query {}", new 
Object[]{selectQuery});
    -                        final ResultSet resultSet;
    -                        if (queryTimeout > 0) {
    -                            resultSet = 
queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS);
    -                            if (AVRO_FORMAT.equals(outputFormat)) {
    -                                
nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, 
TimeUnit.MILLISECONDS));
    -                            } else if (JSON_FORMAT.equals(outputFormat)) {
    -                                
nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, 
TimeUnit.MILLISECONDS));
    -                            }
    -                        } else {
    -                            resultSet = queryFuture.getUninterruptibly();
    -                            if (AVRO_FORMAT.equals(outputFormat)) {
    -                                
nrOfRows.set(convertToAvroStream(resultSet, out, 0, null));
    -                            } else if (JSON_FORMAT.equals(outputFormat)) {
    -                                
nrOfRows.set(convertToJsonStream(resultSet, out, charset, 0, null));
    +            do {
    +                fileToProcess = session.create();
    +
    +                // Assuring that if we have an input FlowFile
    +                // the generated output inherit the attributes
    +                if(attributes != null){
    +                    fileToProcess = 
session.putAllAttributes(fileToProcess, attributes);
    +                }
    +
    +                fileToProcess = session.write(fileToProcess, new 
OutputStreamCallback() {
    +                    @Override
    +                    public void process(final OutputStream out) throws 
IOException {
    +                        try {
    +                            logger.debug("Executing CQL query {}", new 
Object[]{selectQuery});
    +                            if (queryTimeout > 0) {
    +                                if (AVRO_FORMAT.equals(outputFormat)) {
    +                                    
nrOfRows.set(convertToAvroStream(resultSet, maxRowsPerFlowFile,
    +                                            out, queryTimeout, 
TimeUnit.MILLISECONDS));
    +                                } else if 
(JSON_FORMAT.equals(outputFormat)) {
    +                                    
nrOfRows.set(convertToJsonStream(resultSet, maxRowsPerFlowFile,
    +                                            out, charset, queryTimeout, 
TimeUnit.MILLISECONDS));
    +                                }
    +                            } else {
    +                                if (AVRO_FORMAT.equals(outputFormat)) {
    +                                    
nrOfRows.set(convertToAvroStream(resultSet, maxRowsPerFlowFile,
    +                                            out, 0, null));
    +                                } else if 
(JSON_FORMAT.equals(outputFormat)) {
    +                                    
nrOfRows.set(convertToJsonStream(resultSet, maxRowsPerFlowFile,
    +                                            out, charset, 0, null));
    +                                }
                                 }
    -                        }
     
    -                    } catch (final TimeoutException | InterruptedException 
| ExecutionException e) {
    -                        throw new ProcessException(e);
    +                        } catch (final TimeoutException | 
InterruptedException | ExecutionException e) {
    +                            throw new ProcessException(e);
    +                        }
                         }
    -                }
    -            });
    +                });
    +
     
    -            // set attribute how many rows were selected
    -            fileToProcess = session.putAttribute(fileToProcess, 
RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
    +                // set attribute how many rows were selected
    +                fileToProcess = session.putAttribute(fileToProcess, 
RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
     
    -            // set mime.type based on output format
    -            fileToProcess = session.putAttribute(fileToProcess, 
CoreAttributes.MIME_TYPE.key(),
    -                    JSON_FORMAT.equals(outputFormat) ? "application/json" 
: "application/avro-binary");
    +                // set mime.type based on output format
    +                fileToProcess = session.putAttribute(fileToProcess, 
CoreAttributes.MIME_TYPE.key(),
    +                        JSON_FORMAT.equals(outputFormat) ? 
"application/json" : "application/avro-binary");
     
    -            logger.info("{} contains {} Avro records; transferring to 
'success'",
    -                    new Object[]{fileToProcess, nrOfRows.get()});
    -            session.getProvenanceReporter().modifyContent(fileToProcess, 
"Retrieved " + nrOfRows.get() + " rows",
    -                    stopWatch.getElapsed(TimeUnit.MILLISECONDS));
    -            session.transfer(fileToProcess, REL_SUCCESS);
    +                logger.info("{} contains {} records; transferring to 
'success'",
    +                        new Object[]{fileToProcess, nrOfRows.get()});
    +                
session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " + 
nrOfRows.get() + " rows",
    +                        stopWatch.getElapsed(TimeUnit.MILLISECONDS));
    +                session.transfer(fileToProcess, REL_SUCCESS);
    +                session.commit();
    --- End diff --
    
    By the "retrieved attribute", do you mean `RESULT_ROW_COUNT`? I wasn't 
addressing that per se; rather I am noting that the session seems to always be 
committed after processing Max Rows Per Flow File. That's what I meant by the 
"mashup" of the two properties that exist in other processors. It is possible 
in those processors to specify Max Rows Per Flow File, but still have all the 
flow files transferred at once (i.e. after all the rows have been processed). 
That allows you to have smaller flow files for downstream processing, but you 
can still add attributes such as total count, fragment index, etc.
    
    I realize that doesn't solve your use case, where you want the flow files 
to be sent downstream while the rows are still being processed. That's why 
there's a separate property Output Batch Size in those processors, and if set 
to X will commit the session after X number of flow files have been transferred.
    
    In this case you have one property and you're kind of using that value for 
both the values of the analogous properties in the other processors. I'm just 
suggesting that you have two separate properties, for consistency and 
flexibility. For your use case I believe you'll just always set Output Batch 
Size to 1.


---

Reply via email to