levilentz commented on code in PR #6848:
URL: https://github.com/apache/nifi/pull/6848#discussion_r1090015714


##########
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java:
##########
@@ -217,60 +243,94 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
         final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
         final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS);
         final String outputFormat = 
context.getProperty(OUTPUT_FORMAT).getValue();
+        final long maxRowsPerFlowFile = 
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+        final long outputBatchSize = 
context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
         final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue());
         final StopWatch stopWatch = new StopWatch(true);
 
-        if (fileToProcess == null) {
-            fileToProcess = session.create();
-        }
-
         try {
             // The documentation for the driver recommends the session remain 
open the entire time the processor is running
             // and states that it is thread-safe. This is why 
connectionSession is not in a try-with-resources.
             final Session connectionSession = cassandraSession.get();
-            final ResultSetFuture queryFuture = 
connectionSession.executeAsync(selectQuery);
+            final ResultSet resultSet;
+
+            if (queryTimeout > 0) {
+                resultSet = connectionSession.execute(selectQuery, 
queryTimeout, TimeUnit.MILLISECONDS);
+            }else{
+                resultSet = connectionSession.execute(selectQuery);
+            }
             final AtomicLong nrOfRows = new AtomicLong(0L);
 
-            fileToProcess = session.write(fileToProcess, new 
OutputStreamCallback() {
-                @Override
-                public void process(final OutputStream rawOut) throws 
IOException {
-                    try (final OutputStream out = new 
BufferedOutputStream(rawOut)) {
-                        logger.debug("Executing CQL query {}", new 
Object[]{selectQuery});
-                        final ResultSet resultSet;
-                        if (queryTimeout > 0) {
-                            resultSet = 
queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS);
-                            if (AVRO_FORMAT.equals(outputFormat)) {
-                                nrOfRows.set(convertToAvroStream(resultSet, 
out, queryTimeout, TimeUnit.MILLISECONDS));
-                            } else if (JSON_FORMAT.equals(outputFormat)) {
-                                
nrOfRows.set(convertToJsonStream(Optional.of(context), resultSet, out, charset, 
queryTimeout, TimeUnit.MILLISECONDS));
-                            }
-                        } else {
-                            resultSet = queryFuture.getUninterruptibly();
-                            if (AVRO_FORMAT.equals(outputFormat)) {
-                                nrOfRows.set(convertToAvroStream(resultSet, 
out, 0, null));
-                            } else if (JSON_FORMAT.equals(outputFormat)) {
-                                
nrOfRows.set(convertToJsonStream(Optional.of(context), resultSet, out, charset, 
0, null));
+            long flowFileCount = 0;
+
+            if(fileToProcess == null) {
+                fileToProcess = session.create();
+            }
+
+            while(true) {
+
+                fileToProcess = session.write(fileToProcess, new 
OutputStreamCallback() {
+                    @Override
+                    public void process(final OutputStream out) throws 
IOException {
+                        try {
+                            logger.debug("Executing CQL query {}", new 
Object[]{selectQuery});
+                            if (queryTimeout > 0) {
+                                if (AVRO_FORMAT.equals(outputFormat)) {
+                                    
nrOfRows.set(convertToAvroStream(resultSet, maxRowsPerFlowFile,
+                                            out, queryTimeout, 
TimeUnit.MILLISECONDS));
+                                } else if (JSON_FORMAT.equals(outputFormat)) {
+                                    
nrOfRows.set(convertToJsonStream(resultSet, maxRowsPerFlowFile,
+                                            out, charset, queryTimeout, 
TimeUnit.MILLISECONDS));
+                                }
+                            } else {
+                                if (AVRO_FORMAT.equals(outputFormat)) {
+                                    
nrOfRows.set(convertToAvroStream(resultSet, maxRowsPerFlowFile,
+                                            out, 0, null));
+                                } else if (JSON_FORMAT.equals(outputFormat)) {
+                                    
nrOfRows.set(convertToJsonStream(resultSet, maxRowsPerFlowFile,
+                                            out, charset, 0, null));
+                                }
                             }
+                        } catch (final TimeoutException | InterruptedException 
| ExecutionException e) {
+                            throw new ProcessException(e);
                         }
-
-                    } catch (final TimeoutException | InterruptedException | 
ExecutionException e) {
-                        throw new ProcessException(e);
                     }
-                }
-            });
+                });
+
+                // set attribute how many rows were selected
+                fileToProcess = session.putAttribute(fileToProcess, 
RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
 
-            // set attribute how many rows were selected
-            fileToProcess = session.putAttribute(fileToProcess, 
RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
+                // set mime.type based on output format
+                fileToProcess = session.putAttribute(fileToProcess, 
CoreAttributes.MIME_TYPE.key(),
+                        JSON_FORMAT.equals(outputFormat) ? "application/json" 
: "application/avro-binary");
 
-            // set mime.type based on output format
-            fileToProcess = session.putAttribute(fileToProcess, 
CoreAttributes.MIME_TYPE.key(),
-                    JSON_FORMAT.equals(outputFormat) ? "application/json" : 
"application/avro-binary");
+                logger.info("{} contains {} records; transferring to 
'success'",
+                        new Object[]{fileToProcess, nrOfRows.get()});
+                session.getProvenanceReporter().modifyContent(fileToProcess, 
"Retrieved " + nrOfRows.get() + " rows",
+                        stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+                session.transfer(fileToProcess, REL_SUCCESS);
 
-            logger.info("{} contains {} Avro records; transferring to 
'success'",
-                    new Object[]{fileToProcess, nrOfRows.get()});
-            session.getProvenanceReporter().modifyContent(fileToProcess, 
"Retrieved " + nrOfRows.get() + " rows",
-                    stopWatch.getElapsed(TimeUnit.MILLISECONDS));
-            session.transfer(fileToProcess, REL_SUCCESS);
+                if (outputBatchSize > 0) {
+                    flowFileCount++;
+
+                    if (flowFileCount == outputBatchSize) {
+                        session.commitAsync();
+                        flowFileCount = 0;
+                        fileToProcess = session.create();
+                    }
+                }
+                try {
+                    resultSet.fetchMoreResults().get();
+                } catch (Exception e) {
+                    logger.error("ExecutionException : query {} for {} due to 
{}; routing to failure",
+                            new Object[]{selectQuery, fileToProcess, e});

Review Comment:
   Agree, added another try-catch block. It is a bit clunky in the current 
iteration, because of the repeated code. Let me know if you want me to try to 
clean those up. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to