Github user aglotero commented on a diff in the pull request:
https://github.com/apache/nifi/pull/3051#discussion_r237965549
--- Diff:
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
---
@@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext
context) {
@Override
public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+ FlowFile inputFlowFile = null;
FlowFile fileToProcess = null;
+
+ Map<String, String> attributes = null;
+
if (context.hasIncomingConnection()) {
- fileToProcess = session.get();
+ inputFlowFile = session.get();
// If we have no FlowFile, and all incoming connections are
self-loops then we can continue on.
// However, if we have no FlowFile and we have connections
coming from other Processors, then
// we know that we should run only if we have a FlowFile.
- if (fileToProcess == null && context.hasNonLoopConnection()) {
+ if (inputFlowFile == null && context.hasNonLoopConnection()) {
return;
}
+
+ attributes = inputFlowFile.getAttributes();
}
final ComponentLog logger = getLogger();
- final String selectQuery =
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
- final long queryTimeout =
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS);
+ final String selectQuery =
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue();
+ final long queryTimeout =
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS);
final String outputFormat =
context.getProperty(OUTPUT_FORMAT).getValue();
- final Charset charset =
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue());
+ final long maxRowsPerFlowFile =
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+ final Charset charset =
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue());
final StopWatch stopWatch = new StopWatch(true);
- if (fileToProcess == null) {
- fileToProcess = session.create();
+ if(inputFlowFile != null){
+ session.transfer(inputFlowFile, REL_ORIGINAL);
}
try {
// The documentation for the driver recommends the session
remain open the entire time the processor is running
// and states that it is thread-safe. This is why
connectionSession is not in a try-with-resources.
final Session connectionSession = cassandraSession.get();
- final ResultSetFuture queryFuture =
connectionSession.executeAsync(selectQuery);
+ final ResultSet resultSet;
+
+ if (queryTimeout > 0) {
+ resultSet = connectionSession.execute(selectQuery,
queryTimeout, TimeUnit.MILLISECONDS);
+ }else{
+ resultSet = connectionSession.execute(selectQuery);
+ }
+
final AtomicLong nrOfRows = new AtomicLong(0L);
- fileToProcess = session.write(fileToProcess, new
OutputStreamCallback() {
- @Override
- public void process(final OutputStream out) throws
IOException {
- try {
- logger.debug("Executing CQL query {}", new
Object[]{selectQuery});
- final ResultSet resultSet;
- if (queryTimeout > 0) {
- resultSet =
queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS);
- if (AVRO_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout,
TimeUnit.MILLISECONDS));
- } else if (JSON_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout,
TimeUnit.MILLISECONDS));
- }
- } else {
- resultSet = queryFuture.getUninterruptibly();
- if (AVRO_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToAvroStream(resultSet, out, 0, null));
- } else if (JSON_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToJsonStream(resultSet, out, charset, 0, null));
+ do {
+ fileToProcess = session.create();
+
+ // Assuring that if we have an input FlowFile
+ // the generated output inherit the attributes
+ if(attributes != null){
+ fileToProcess =
session.putAllAttributes(fileToProcess, attributes);
+ }
+
+ fileToProcess = session.write(fileToProcess, new
OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws
IOException {
+ try {
+ logger.debug("Executing CQL query {}", new
Object[]{selectQuery});
+ if (queryTimeout > 0) {
+ if (AVRO_FORMAT.equals(outputFormat)) {
+
nrOfRows.set(convertToAvroStream(resultSet, maxRowsPerFlowFile,
+ out, queryTimeout,
TimeUnit.MILLISECONDS));
+ } else if
(JSON_FORMAT.equals(outputFormat)) {
+
nrOfRows.set(convertToJsonStream(resultSet, maxRowsPerFlowFile,
+ out, charset, queryTimeout,
TimeUnit.MILLISECONDS));
+ }
+ } else {
+ if (AVRO_FORMAT.equals(outputFormat)) {
+
nrOfRows.set(convertToAvroStream(resultSet, maxRowsPerFlowFile,
+ out, 0, null));
+ } else if
(JSON_FORMAT.equals(outputFormat)) {
+
nrOfRows.set(convertToJsonStream(resultSet, maxRowsPerFlowFile,
+ out, charset, 0, null));
+ }
}
- }
- } catch (final TimeoutException | InterruptedException
| ExecutionException e) {
- throw new ProcessException(e);
+ } catch (final TimeoutException |
InterruptedException | ExecutionException e) {
+ throw new ProcessException(e);
+ }
}
- }
- });
+ });
+
- // set attribute how many rows were selected
- fileToProcess = session.putAttribute(fileToProcess,
RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
+ // set attribute how many rows were selected
+ fileToProcess = session.putAttribute(fileToProcess,
RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
- // set mime.type based on output format
- fileToProcess = session.putAttribute(fileToProcess,
CoreAttributes.MIME_TYPE.key(),
- JSON_FORMAT.equals(outputFormat) ? "application/json"
: "application/avro-binary");
+ // set mime.type based on output format
+ fileToProcess = session.putAttribute(fileToProcess,
CoreAttributes.MIME_TYPE.key(),
+ JSON_FORMAT.equals(outputFormat) ?
"application/json" : "application/avro-binary");
- logger.info("{} contains {} Avro records; transferring to
'success'",
- new Object[]{fileToProcess, nrOfRows.get()});
- session.getProvenanceReporter().modifyContent(fileToProcess,
"Retrieved " + nrOfRows.get() + " rows",
- stopWatch.getElapsed(TimeUnit.MILLISECONDS));
- session.transfer(fileToProcess, REL_SUCCESS);
+ logger.info("{} contains {} records; transferring to
'success'",
+ new Object[]{fileToProcess, nrOfRows.get()});
+
session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " +
nrOfRows.get() + " rows",
+ stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+ session.transfer(fileToProcess, REL_SUCCESS);
+ session.commit();
--- End diff --
I think that I finally understand your point @mattyb149, thanks for the
clarifications!
---