Github user aglotero commented on a diff in the pull request:
https://github.com/apache/nifi/pull/3051#discussion_r228371811
--- Diff:
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
---
@@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext
context) {
@Override
public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+ FlowFile inputFlowFile = null;
FlowFile fileToProcess = null;
+
+ Map<String, String> attributes = null;
+
if (context.hasIncomingConnection()) {
- fileToProcess = session.get();
+ inputFlowFile = session.get();
// If we have no FlowFile, and all incoming connections are
self-loops then we can continue on.
// However, if we have no FlowFile and we have connections
coming from other Processors, then
// we know that we should run only if we have a FlowFile.
- if (fileToProcess == null && context.hasNonLoopConnection()) {
+ if (inputFlowFile == null && context.hasNonLoopConnection()) {
return;
}
+
+ attributes = inputFlowFile.getAttributes();
}
final ComponentLog logger = getLogger();
- final String selectQuery =
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
- final long queryTimeout =
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS);
+ final String selectQuery =
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue();
+ final long queryTimeout =
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS);
final String outputFormat =
context.getProperty(OUTPUT_FORMAT).getValue();
- final Charset charset =
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue());
+ final long maxRowsPerFlowFile =
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+ final Charset charset =
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue());
final StopWatch stopWatch = new StopWatch(true);
- if (fileToProcess == null) {
- fileToProcess = session.create();
+ if(inputFlowFile != null){
+ session.transfer(inputFlowFile, REL_ORIGINAL);
}
try {
// The documentation for the driver recommends the session
remain open the entire time the processor is running
// and states that it is thread-safe. This is why
connectionSession is not in a try-with-resources.
final Session connectionSession = cassandraSession.get();
- final ResultSetFuture queryFuture =
connectionSession.executeAsync(selectQuery);
+ final ResultSet resultSet;
+
+ if (queryTimeout > 0) {
+ resultSet = connectionSession.execute(selectQuery,
queryTimeout, TimeUnit.MILLISECONDS);
+ }else{
+ resultSet = connectionSession.execute(selectQuery);
+ }
+
final AtomicLong nrOfRows = new AtomicLong(0L);
- fileToProcess = session.write(fileToProcess, new
OutputStreamCallback() {
- @Override
- public void process(final OutputStream out) throws
IOException {
- try {
- logger.debug("Executing CQL query {}", new
Object[]{selectQuery});
- final ResultSet resultSet;
- if (queryTimeout > 0) {
- resultSet =
queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS);
- if (AVRO_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout,
TimeUnit.MILLISECONDS));
- } else if (JSON_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout,
TimeUnit.MILLISECONDS));
- }
- } else {
- resultSet = queryFuture.getUninterruptibly();
- if (AVRO_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToAvroStream(resultSet, out, 0, null));
- } else if (JSON_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToJsonStream(resultSet, out, charset, 0, null));
+ do {
+ fileToProcess = session.create();
+
+ // Assuring that if we have an input FlowFile
+ // the generated output inherit the attributes
+ if(attributes != null){
+ fileToProcess =
session.putAllAttributes(fileToProcess, attributes);
+ }
+
+ fileToProcess = session.write(fileToProcess, new
OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws
IOException {
+ try {
+ logger.debug("Executing CQL query {}", new
Object[]{selectQuery});
+ if (queryTimeout > 0) {
+ if (AVRO_FORMAT.equals(outputFormat)) {
+
nrOfRows.set(convertToAvroStream(resultSet, maxRowsPerFlowFile,
+ out, queryTimeout,
TimeUnit.MILLISECONDS));
+ } else if
(JSON_FORMAT.equals(outputFormat)) {
+
nrOfRows.set(convertToJsonStream(resultSet, maxRowsPerFlowFile,
+ out, charset, queryTimeout,
TimeUnit.MILLISECONDS));
+ }
+ } else {
+ if (AVRO_FORMAT.equals(outputFormat)) {
+
nrOfRows.set(convertToAvroStream(resultSet, maxRowsPerFlowFile,
+ out, 0, null));
+ } else if
(JSON_FORMAT.equals(outputFormat)) {
+
nrOfRows.set(convertToJsonStream(resultSet, maxRowsPerFlowFile,
+ out, charset, 0, null));
+ }
}
- }
- } catch (final TimeoutException | InterruptedException
| ExecutionException e) {
- throw new ProcessException(e);
+ } catch (final TimeoutException |
InterruptedException | ExecutionException e) {
+ throw new ProcessException(e);
+ }
}
- }
- });
+ });
+
- // set attribute how many rows were selected
- fileToProcess = session.putAttribute(fileToProcess,
RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
+ // set attribute how many rows were selected
+ fileToProcess = session.putAttribute(fileToProcess,
RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
- // set mime.type based on output format
- fileToProcess = session.putAttribute(fileToProcess,
CoreAttributes.MIME_TYPE.key(),
- JSON_FORMAT.equals(outputFormat) ? "application/json"
: "application/avro-binary");
+ // set mime.type based on output format
+ fileToProcess = session.putAttribute(fileToProcess,
CoreAttributes.MIME_TYPE.key(),
+ JSON_FORMAT.equals(outputFormat) ?
"application/json" : "application/avro-binary");
- logger.info("{} contains {} Avro records; transferring to
'success'",
- new Object[]{fileToProcess, nrOfRows.get()});
- session.getProvenanceReporter().modifyContent(fileToProcess,
"Retrieved " + nrOfRows.get() + " rows",
- stopWatch.getElapsed(TimeUnit.MILLISECONDS));
- session.transfer(fileToProcess, REL_SUCCESS);
+ logger.info("{} contains {} records; transferring to
'success'",
+ new Object[]{fileToProcess, nrOfRows.get()});
+
session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " +
nrOfRows.get() + " rows",
+ stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+ session.transfer(fileToProcess, REL_SUCCESS);
+ session.commit();
--- End diff --
The "Issue" here is as we have this two features, the "Retrieved" attribute
has a new semantic: for "Output Batch Size" is > 0, "Retrieved" means the
number of rows on this flow file, if "Output Batch Size" is == 0, "Retrieved"
means the total number of rows returned by the query, independently the number
of output files.
My particular use case demands these parameters be segregated, because our
database has millions of rows, and wait for the query end will delay the
further processing, and the "Retrieved" attribute lost his original semantics,
but in the case where the user set "Output Batch Size" = 0, the original
semantic remain unchanged.
On this PR these features are segregated, but the "Retrieved" attribute is
a little confusing.
---