Github user aglotero commented on a diff in the pull request:
https://github.com/apache/nifi/pull/3051#discussion_r228522688
--- Diff:
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
---
@@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext
context) {
@Override
public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+ FlowFile inputFlowFile = null;
FlowFile fileToProcess = null;
+
+ Map<String, String> attributes = null;
+
if (context.hasIncomingConnection()) {
- fileToProcess = session.get();
+ inputFlowFile = session.get();
// If we have no FlowFile, and all incoming connections are
self-loops then we can continue on.
// However, if we have no FlowFile and we have connections
coming from other Processors, then
// we know that we should run only if we have a FlowFile.
- if (fileToProcess == null && context.hasNonLoopConnection()) {
+ if (inputFlowFile == null && context.hasNonLoopConnection()) {
return;
}
+
+ attributes = inputFlowFile.getAttributes();
}
final ComponentLog logger = getLogger();
- final String selectQuery =
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
- final long queryTimeout =
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS);
+ final String selectQuery =
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue();
+ final long queryTimeout =
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS);
final String outputFormat =
context.getProperty(OUTPUT_FORMAT).getValue();
- final Charset charset =
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue());
+ final long maxRowsPerFlowFile =
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+ final Charset charset =
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue());
final StopWatch stopWatch = new StopWatch(true);
- if (fileToProcess == null) {
- fileToProcess = session.create();
+ if(inputFlowFile != null){
+ session.transfer(inputFlowFile, REL_ORIGINAL);
}
try {
// The documentation for the driver recommends the session
remain open the entire time the processor is running
// and states that it is thread-safe. This is why
connectionSession is not in a try-with-resources.
final Session connectionSession = cassandraSession.get();
- final ResultSetFuture queryFuture =
connectionSession.executeAsync(selectQuery);
+ final ResultSet resultSet;
+
+ if (queryTimeout > 0) {
+ resultSet = connectionSession.execute(selectQuery,
queryTimeout, TimeUnit.MILLISECONDS);
+ }else{
+ resultSet = connectionSession.execute(selectQuery);
+ }
+
final AtomicLong nrOfRows = new AtomicLong(0L);
- fileToProcess = session.write(fileToProcess, new
OutputStreamCallback() {
- @Override
- public void process(final OutputStream out) throws
IOException {
- try {
- logger.debug("Executing CQL query {}", new
Object[]{selectQuery});
- final ResultSet resultSet;
- if (queryTimeout > 0) {
- resultSet =
queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS);
- if (AVRO_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout,
TimeUnit.MILLISECONDS));
- } else if (JSON_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout,
TimeUnit.MILLISECONDS));
- }
- } else {
- resultSet = queryFuture.getUninterruptibly();
- if (AVRO_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToAvroStream(resultSet, out, 0, null));
- } else if (JSON_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToJsonStream(resultSet, out, charset, 0, null));
+ do {
+ fileToProcess = session.create();
+
+ // Assuring that if we have an input FlowFile
+ // the generated output inherit the attributes
+ if(attributes != null){
+ fileToProcess =
session.putAllAttributes(fileToProcess, attributes);
--- End diff --
This is true, but the incremental commit force us to update attributes on
the new flowfiles, on my first development I was creating the fileToProcess
like your suggestion, after getting some errors on tests, I realized that to
inherit the attributes both parent and child flow files must reside on the same
session.
When we are performing the partial commit this is not true anymore.
After the first session.commit, create a new flow file inheriting from the
input flow file will cause an error on the next session.commit.
---