Github user mattyb149 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/3051#discussion_r228328178
--- Diff:
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
---
@@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext
context) {
@Override
public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+ FlowFile inputFlowFile = null;
FlowFile fileToProcess = null;
+
+ Map<String, String> attributes = null;
+
if (context.hasIncomingConnection()) {
- fileToProcess = session.get();
+ inputFlowFile = session.get();
// If we have no FlowFile, and all incoming connections are
self-loops then we can continue on.
// However, if we have no FlowFile and we have connections
coming from other Processors, then
// we know that we should run only if we have a FlowFile.
- if (fileToProcess == null && context.hasNonLoopConnection()) {
+ if (inputFlowFile == null && context.hasNonLoopConnection()) {
return;
}
+
+ attributes = inputFlowFile.getAttributes();
}
final ComponentLog logger = getLogger();
- final String selectQuery =
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
- final long queryTimeout =
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS);
+ final String selectQuery =
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue();
+ final long queryTimeout =
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS);
final String outputFormat =
context.getProperty(OUTPUT_FORMAT).getValue();
- final Charset charset =
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue());
+ final long maxRowsPerFlowFile =
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+ final Charset charset =
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue());
final StopWatch stopWatch = new StopWatch(true);
- if (fileToProcess == null) {
- fileToProcess = session.create();
+ if(inputFlowFile != null){
+ session.transfer(inputFlowFile, REL_ORIGINAL);
}
try {
// The documentation for the driver recommends the session
remain open the entire time the processor is running
// and states that it is thread-safe. This is why
connectionSession is not in a try-with-resources.
final Session connectionSession = cassandraSession.get();
- final ResultSetFuture queryFuture =
connectionSession.executeAsync(selectQuery);
+ final ResultSet resultSet;
+
+ if (queryTimeout > 0) {
+ resultSet = connectionSession.execute(selectQuery,
queryTimeout, TimeUnit.MILLISECONDS);
+ }else{
+ resultSet = connectionSession.execute(selectQuery);
+ }
+
final AtomicLong nrOfRows = new AtomicLong(0L);
- fileToProcess = session.write(fileToProcess, new
OutputStreamCallback() {
- @Override
- public void process(final OutputStream out) throws
IOException {
- try {
- logger.debug("Executing CQL query {}", new
Object[]{selectQuery});
- final ResultSet resultSet;
- if (queryTimeout > 0) {
- resultSet =
queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS);
- if (AVRO_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout,
TimeUnit.MILLISECONDS));
- } else if (JSON_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout,
TimeUnit.MILLISECONDS));
- }
- } else {
- resultSet = queryFuture.getUninterruptibly();
- if (AVRO_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToAvroStream(resultSet, out, 0, null));
- } else if (JSON_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToJsonStream(resultSet, out, charset, 0, null));
+ do {
+ fileToProcess = session.create();
+
+ // Assuring that if we have an input FlowFile
+ // the generated output inherit the attributes
+ if(attributes != null){
+ fileToProcess =
session.putAllAttributes(fileToProcess, attributes);
--- End diff --
see my comment above, if you create fileToProcess using the inputFlowFile
as a parent, you won't need to transfer attributes this way, it will
automatically inherit them.
---