MikeThomsen commented on code in PR #6848:
URL: https://github.com/apache/nifi/pull/6848#discussion_r1086660836
##########
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java:
##########
@@ -217,60 +243,94 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
final String selectQuery =
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
final long queryTimeout =
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS);
final String outputFormat =
context.getProperty(OUTPUT_FORMAT).getValue();
+ final long maxRowsPerFlowFile =
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+ final long outputBatchSize =
context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
final Charset charset =
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue());
final StopWatch stopWatch = new StopWatch(true);
- if (fileToProcess == null) {
- fileToProcess = session.create();
- }
-
try {
// The documentation for the driver recommends the session remain
open the entire time the processor is running
// and states that it is thread-safe. This is why
connectionSession is not in a try-with-resources.
final Session connectionSession = cassandraSession.get();
- final ResultSetFuture queryFuture =
connectionSession.executeAsync(selectQuery);
+ final ResultSet resultSet;
+
+ if (queryTimeout > 0) {
+ resultSet = connectionSession.execute(selectQuery,
queryTimeout, TimeUnit.MILLISECONDS);
+ }else{
+ resultSet = connectionSession.execute(selectQuery);
+ }
final AtomicLong nrOfRows = new AtomicLong(0L);
- fileToProcess = session.write(fileToProcess, new
OutputStreamCallback() {
- @Override
- public void process(final OutputStream rawOut) throws
IOException {
- try (final OutputStream out = new
BufferedOutputStream(rawOut)) {
- logger.debug("Executing CQL query {}", new
Object[]{selectQuery});
- final ResultSet resultSet;
- if (queryTimeout > 0) {
- resultSet =
queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS);
- if (AVRO_FORMAT.equals(outputFormat)) {
- nrOfRows.set(convertToAvroStream(resultSet,
out, queryTimeout, TimeUnit.MILLISECONDS));
- } else if (JSON_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToJsonStream(Optional.of(context), resultSet, out, charset,
queryTimeout, TimeUnit.MILLISECONDS));
- }
- } else {
- resultSet = queryFuture.getUninterruptibly();
- if (AVRO_FORMAT.equals(outputFormat)) {
- nrOfRows.set(convertToAvroStream(resultSet,
out, 0, null));
- } else if (JSON_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToJsonStream(Optional.of(context), resultSet, out, charset,
0, null));
+ long flowFileCount = 0;
+
+ if(fileToProcess == null) {
+ fileToProcess = session.create();
+ }
+
+ while(true) {
+
+ fileToProcess = session.write(fileToProcess, new
OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws
IOException {
+ try {
+ logger.debug("Executing CQL query {}", new
Object[]{selectQuery});
+ if (queryTimeout > 0) {
+ if (AVRO_FORMAT.equals(outputFormat)) {
+
nrOfRows.set(convertToAvroStream(resultSet, maxRowsPerFlowFile,
+ out, queryTimeout,
TimeUnit.MILLISECONDS));
+ } else if (JSON_FORMAT.equals(outputFormat)) {
+
nrOfRows.set(convertToJsonStream(resultSet, maxRowsPerFlowFile,
+ out, charset, queryTimeout,
TimeUnit.MILLISECONDS));
+ }
+ } else {
+ if (AVRO_FORMAT.equals(outputFormat)) {
+
nrOfRows.set(convertToAvroStream(resultSet, maxRowsPerFlowFile,
+ out, 0, null));
+ } else if (JSON_FORMAT.equals(outputFormat)) {
+
nrOfRows.set(convertToJsonStream(resultSet, maxRowsPerFlowFile,
+ out, charset, 0, null));
+ }
}
+ } catch (final TimeoutException | InterruptedException
| ExecutionException e) {
+ throw new ProcessException(e);
}
-
- } catch (final TimeoutException | InterruptedException |
ExecutionException e) {
- throw new ProcessException(e);
}
- }
- });
+ });
+
+ // set attribute how many rows were selected
+ fileToProcess = session.putAttribute(fileToProcess,
RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
- // set attribute how many rows were selected
- fileToProcess = session.putAttribute(fileToProcess,
RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
+ // set mime.type based on output format
+ fileToProcess = session.putAttribute(fileToProcess,
CoreAttributes.MIME_TYPE.key(),
+ JSON_FORMAT.equals(outputFormat) ? "application/json"
: "application/avro-binary");
- // set mime.type based on output format
- fileToProcess = session.putAttribute(fileToProcess,
CoreAttributes.MIME_TYPE.key(),
- JSON_FORMAT.equals(outputFormat) ? "application/json" :
"application/avro-binary");
+ logger.info("{} contains {} records; transferring to
'success'",
+ new Object[]{fileToProcess, nrOfRows.get()});
+ session.getProvenanceReporter().modifyContent(fileToProcess,
"Retrieved " + nrOfRows.get() + " rows",
+ stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+ session.transfer(fileToProcess, REL_SUCCESS);
- logger.info("{} contains {} Avro records; transferring to
'success'",
- new Object[]{fileToProcess, nrOfRows.get()});
- session.getProvenanceReporter().modifyContent(fileToProcess,
"Retrieved " + nrOfRows.get() + " rows",
- stopWatch.getElapsed(TimeUnit.MILLISECONDS));
- session.transfer(fileToProcess, REL_SUCCESS);
+ if (outputBatchSize > 0) {
+ flowFileCount++;
+
+ if (flowFileCount == outputBatchSize) {
+ session.commitAsync();
+ flowFileCount = 0;
+ fileToProcess = session.create();
+ }
+ }
+ try {
+ resultSet.fetchMoreResults().get();
+ } catch (Exception e) {
+ logger.error("ExecutionException : query {} for {} due to
{}; routing to failure",
+ new Object[]{selectQuery, fileToProcess, e});
Review Comment:
I am not sure this try-catch block is necessary because you want execution
to stop here if this happens. Not catching the exception would break the loop
and take you to the outer error handling.
##########
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java:
##########
@@ -217,60 +243,94 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
final String selectQuery =
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
final long queryTimeout =
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS);
final String outputFormat =
context.getProperty(OUTPUT_FORMAT).getValue();
+ final long maxRowsPerFlowFile =
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+ final long outputBatchSize =
context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
final Charset charset =
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue());
final StopWatch stopWatch = new StopWatch(true);
- if (fileToProcess == null) {
- fileToProcess = session.create();
- }
-
try {
// The documentation for the driver recommends the session remain
open the entire time the processor is running
// and states that it is thread-safe. This is why
connectionSession is not in a try-with-resources.
final Session connectionSession = cassandraSession.get();
- final ResultSetFuture queryFuture =
connectionSession.executeAsync(selectQuery);
+ final ResultSet resultSet;
+
+ if (queryTimeout > 0) {
+ resultSet = connectionSession.execute(selectQuery,
queryTimeout, TimeUnit.MILLISECONDS);
+ }else{
+ resultSet = connectionSession.execute(selectQuery);
+ }
final AtomicLong nrOfRows = new AtomicLong(0L);
- fileToProcess = session.write(fileToProcess, new
OutputStreamCallback() {
- @Override
- public void process(final OutputStream rawOut) throws
IOException {
- try (final OutputStream out = new
BufferedOutputStream(rawOut)) {
- logger.debug("Executing CQL query {}", new
Object[]{selectQuery});
- final ResultSet resultSet;
- if (queryTimeout > 0) {
- resultSet =
queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS);
- if (AVRO_FORMAT.equals(outputFormat)) {
- nrOfRows.set(convertToAvroStream(resultSet,
out, queryTimeout, TimeUnit.MILLISECONDS));
- } else if (JSON_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToJsonStream(Optional.of(context), resultSet, out, charset,
queryTimeout, TimeUnit.MILLISECONDS));
- }
- } else {
- resultSet = queryFuture.getUninterruptibly();
- if (AVRO_FORMAT.equals(outputFormat)) {
- nrOfRows.set(convertToAvroStream(resultSet,
out, 0, null));
- } else if (JSON_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToJsonStream(Optional.of(context), resultSet, out, charset,
0, null));
+ long flowFileCount = 0;
+
+ if(fileToProcess == null) {
+ fileToProcess = session.create();
+ }
+
+ while(true) {
+
+ fileToProcess = session.write(fileToProcess, new
OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws
IOException {
+ try {
+ logger.debug("Executing CQL query {}", new
Object[]{selectQuery});
+ if (queryTimeout > 0) {
+ if (AVRO_FORMAT.equals(outputFormat)) {
+
nrOfRows.set(convertToAvroStream(resultSet, maxRowsPerFlowFile,
+ out, queryTimeout,
TimeUnit.MILLISECONDS));
+ } else if (JSON_FORMAT.equals(outputFormat)) {
+
nrOfRows.set(convertToJsonStream(resultSet, maxRowsPerFlowFile,
+ out, charset, queryTimeout,
TimeUnit.MILLISECONDS));
+ }
+ } else {
+ if (AVRO_FORMAT.equals(outputFormat)) {
+
nrOfRows.set(convertToAvroStream(resultSet, maxRowsPerFlowFile,
+ out, 0, null));
+ } else if (JSON_FORMAT.equals(outputFormat)) {
+
nrOfRows.set(convertToJsonStream(resultSet, maxRowsPerFlowFile,
+ out, charset, 0, null));
+ }
}
+ } catch (final TimeoutException | InterruptedException
| ExecutionException e) {
+ throw new ProcessException(e);
}
-
- } catch (final TimeoutException | InterruptedException |
ExecutionException e) {
- throw new ProcessException(e);
}
- }
- });
+ });
+
+ // set attribute how many rows were selected
+ fileToProcess = session.putAttribute(fileToProcess,
RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
- // set attribute how many rows were selected
- fileToProcess = session.putAttribute(fileToProcess,
RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
+ // set mime.type based on output format
+ fileToProcess = session.putAttribute(fileToProcess,
CoreAttributes.MIME_TYPE.key(),
+ JSON_FORMAT.equals(outputFormat) ? "application/json"
: "application/avro-binary");
- // set mime.type based on output format
- fileToProcess = session.putAttribute(fileToProcess,
CoreAttributes.MIME_TYPE.key(),
- JSON_FORMAT.equals(outputFormat) ? "application/json" :
"application/avro-binary");
+ logger.info("{} contains {} records; transferring to
'success'",
Review Comment:
This should be `if (logger.isDebugEnabled()) { logger.debug() }`
##########
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java:
##########
@@ -217,60 +243,94 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
final String selectQuery =
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
final long queryTimeout =
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS);
final String outputFormat =
context.getProperty(OUTPUT_FORMAT).getValue();
+ final long maxRowsPerFlowFile =
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+ final long outputBatchSize =
context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
final Charset charset =
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue());
final StopWatch stopWatch = new StopWatch(true);
- if (fileToProcess == null) {
- fileToProcess = session.create();
- }
-
try {
// The documentation for the driver recommends the session remain
open the entire time the processor is running
// and states that it is thread-safe. This is why
connectionSession is not in a try-with-resources.
final Session connectionSession = cassandraSession.get();
- final ResultSetFuture queryFuture =
connectionSession.executeAsync(selectQuery);
+ final ResultSet resultSet;
+
+ if (queryTimeout > 0) {
+ resultSet = connectionSession.execute(selectQuery,
queryTimeout, TimeUnit.MILLISECONDS);
+ }else{
+ resultSet = connectionSession.execute(selectQuery);
+ }
final AtomicLong nrOfRows = new AtomicLong(0L);
- fileToProcess = session.write(fileToProcess, new
OutputStreamCallback() {
- @Override
- public void process(final OutputStream rawOut) throws
IOException {
- try (final OutputStream out = new
BufferedOutputStream(rawOut)) {
- logger.debug("Executing CQL query {}", new
Object[]{selectQuery});
- final ResultSet resultSet;
- if (queryTimeout > 0) {
- resultSet =
queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS);
- if (AVRO_FORMAT.equals(outputFormat)) {
- nrOfRows.set(convertToAvroStream(resultSet,
out, queryTimeout, TimeUnit.MILLISECONDS));
- } else if (JSON_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToJsonStream(Optional.of(context), resultSet, out, charset,
queryTimeout, TimeUnit.MILLISECONDS));
- }
- } else {
- resultSet = queryFuture.getUninterruptibly();
- if (AVRO_FORMAT.equals(outputFormat)) {
- nrOfRows.set(convertToAvroStream(resultSet,
out, 0, null));
- } else if (JSON_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToJsonStream(Optional.of(context), resultSet, out, charset,
0, null));
+ long flowFileCount = 0;
+
+ if(fileToProcess == null) {
+ fileToProcess = session.create();
+ }
+
+ while(true) {
+
+ fileToProcess = session.write(fileToProcess, new
OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws
IOException {
+ try {
+ logger.debug("Executing CQL query {}", new
Object[]{selectQuery});
+ if (queryTimeout > 0) {
+ if (AVRO_FORMAT.equals(outputFormat)) {
+
nrOfRows.set(convertToAvroStream(resultSet, maxRowsPerFlowFile,
+ out, queryTimeout,
TimeUnit.MILLISECONDS));
+ } else if (JSON_FORMAT.equals(outputFormat)) {
+
nrOfRows.set(convertToJsonStream(resultSet, maxRowsPerFlowFile,
+ out, charset, queryTimeout,
TimeUnit.MILLISECONDS));
+ }
+ } else {
+ if (AVRO_FORMAT.equals(outputFormat)) {
+
nrOfRows.set(convertToAvroStream(resultSet, maxRowsPerFlowFile,
+ out, 0, null));
+ } else if (JSON_FORMAT.equals(outputFormat)) {
+
nrOfRows.set(convertToJsonStream(resultSet, maxRowsPerFlowFile,
+ out, charset, 0, null));
+ }
}
+ } catch (final TimeoutException | InterruptedException
| ExecutionException e) {
+ throw new ProcessException(e);
}
-
- } catch (final TimeoutException | InterruptedException |
ExecutionException e) {
- throw new ProcessException(e);
}
- }
- });
+ });
+
+ // set attribute how many rows were selected
+ fileToProcess = session.putAttribute(fileToProcess,
RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
- // set attribute how many rows were selected
- fileToProcess = session.putAttribute(fileToProcess,
RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
+ // set mime.type based on output format
+ fileToProcess = session.putAttribute(fileToProcess,
CoreAttributes.MIME_TYPE.key(),
+ JSON_FORMAT.equals(outputFormat) ? "application/json"
: "application/avro-binary");
- // set mime.type based on output format
- fileToProcess = session.putAttribute(fileToProcess,
CoreAttributes.MIME_TYPE.key(),
- JSON_FORMAT.equals(outputFormat) ? "application/json" :
"application/avro-binary");
+ logger.info("{} contains {} records; transferring to
'success'",
+ new Object[]{fileToProcess, nrOfRows.get()});
+ session.getProvenanceReporter().modifyContent(fileToProcess,
"Retrieved " + nrOfRows.get() + " rows",
+ stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+ session.transfer(fileToProcess, REL_SUCCESS);
- logger.info("{} contains {} Avro records; transferring to
'success'",
- new Object[]{fileToProcess, nrOfRows.get()});
- session.getProvenanceReporter().modifyContent(fileToProcess,
"Retrieved " + nrOfRows.get() + " rows",
- stopWatch.getElapsed(TimeUnit.MILLISECONDS));
- session.transfer(fileToProcess, REL_SUCCESS);
+ if (outputBatchSize > 0) {
+ flowFileCount++;
+
+ if (flowFileCount == outputBatchSize) {
+ session.commitAsync();
+ flowFileCount = 0;
+ fileToProcess = session.create();
+ }
+ }
+ try {
+ resultSet.fetchMoreResults().get();
+ } catch (Exception e) {
+ logger.error("ExecutionException : query {} for {} due to
{}; routing to failure",
+ new Object[]{selectQuery, fileToProcess, e});
+ }
+ if (!resultSet.isExhausted()) {
Review Comment:
It would be better to have this as "if exhausted, stop."
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]