This is an automated email from the ASF dual-hosted git repository.
mthomsen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new d80a19e230 NIFI-5642: QueryCassandra processor : output FlowFiles as
soon fetch_size is reached NIFI-5642: QueryCassandra processor : output
FlowFiles as soon fetch_size is reached Fixed checkstyle error Delete build.sh
Delete local build file NIFI-5642 : letting fetch_size to control the Cassandra
data flow creating a new MAX_ROWS_PER_FLOW_FILE parameter Fixed checkstyle
error: no more import java.util.* Fixed missing imports NIFI-5642: added
REL_ORIGINAL relationship in order t [...]
d80a19e230 is described below
commit d80a19e2308ce630b079b440c7f8452363ee9939
Author: aglotero <[email protected]>
AuthorDate: Mon Oct 8 18:22:19 2018 -0300
NIFI-5642: QueryCassandra processor : output FlowFiles as soon fetch_size
is reached
NIFI-5642: QueryCassandra processor : output FlowFiles as soon fetch_size
is reached
Fixed checkstyle error
Delete build.sh
Delete local build file
NIFI-5642 : letting fetch_size to control the Cassandra data flow creating
a new MAX_ROWS_PER_FLOW_FILE parameter
Fixed checkstyle error: no more import java.util.*
Fixed missing imports
NIFI-5642: added REL_ORIGINAL relationship in order to allow incremental
commit
Addressing comments from code review
Adjustments on timestamp datatype formatting
Created the OUTPUT_BATCH_SIZE property
Code review adjustments
NIFI-5642: update after rebase
NIFI-5642: addressing PR comments
NIFI-5642: adding in integration test, fixing race condition
NIFI-5642: remove log4j2
This closes #6848
Signed-off-by: Mike Thomsen <[email protected]>
---
.../cassandra/AbstractCassandraProcessor.java | 24 +-
.../nifi/processors/cassandra/QueryCassandra.java | 444 ++++++++++++++-------
.../cassandra/CassandraQueryTestUtil.java | 31 +-
.../processors/cassandra/QueryCassandraIT.java | 163 ++++++++
.../processors/cassandra/QueryCassandraTest.java | 124 +++++-
5 files changed, 601 insertions(+), 185 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
index 0c53a35a80..002ec27baf 100644
---
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
+++
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
@@ -28,14 +28,6 @@ import com.datastax.driver.core.Session;
import com.datastax.driver.core.TypeCodec;
import com.datastax.driver.core.exceptions.AuthenticationException;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.net.ssl.SSLContext;
import com.datastax.driver.extras.codecs.arrays.ObjectArrayCodec;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
@@ -56,6 +48,15 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.ssl.SSLContextService;
+import javax.net.ssl.SSLContext;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
/**
* AbstractCassandraProcessor is a base class for Cassandra processors and
contains logic and variables common to most
* processors integrating with Apache Cassandra.
@@ -181,7 +182,6 @@ public abstract class AbstractCassandraProcessor extends
AbstractProcessor {
descriptors.add(USERNAME);
descriptors.add(PASSWORD);
descriptors.add(CONSISTENCY_LEVEL);
- descriptors.add(COMPRESSION_TYPE);
descriptors.add(CHARSET);
}
@@ -209,12 +209,12 @@ public abstract class AbstractCassandraProcessor extends
AbstractProcessor {
if (connectionProviderIsSet && contactPointsIsSet) {
results.add(new ValidationResult.Builder().subject("Cassandra
configuration").valid(false).explanation("both " +
CONNECTION_PROVIDER_SERVICE.getDisplayName() +
- " and processor level Cassandra configuration cannot
be provided at the same time.").build());
+ " and processor level Cassandra configuration cannot be
provided at the same time.").build());
}
if (!connectionProviderIsSet && !contactPointsIsSet) {
results.add(new ValidationResult.Builder().subject("Cassandra
configuration").valid(false).explanation("either " +
CONNECTION_PROVIDER_SERVICE.getDisplayName() +
- " or processor level Cassandra configuration has to be
provided.").build());
+ " or processor level Cassandra configuration has to be
provided.").build());
}
return results;
@@ -224,7 +224,6 @@ public abstract class AbstractCassandraProcessor extends
AbstractProcessor {
public void onScheduled(ProcessContext context) {
final boolean connectionProviderIsSet =
context.getProperty(CONNECTION_PROVIDER_SERVICE).isSet();
- // Register codecs
registerAdditionalCodecs();
if (connectionProviderIsSet) {
@@ -386,7 +385,6 @@ public abstract class AbstractCassandraProcessor extends
AbstractProcessor {
} else if (dataType.equals(DataType.timestamp())) {
return row.getTimestamp(i);
-
} else if (dataType.equals(DataType.date())) {
return row.getDate(i);
diff --git
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
index 6212082861..0dac574fc1 100644
---
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
+++
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
@@ -19,7 +19,6 @@ package org.apache.nifi.processors.cassandra;
import com.datastax.driver.core.ColumnDefinitions;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
@@ -58,7 +57,6 @@ import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
-import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
@@ -70,6 +68,7 @@ import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.TimeZone;
@@ -123,6 +122,30 @@ public class QueryCassandra extends
AbstractCassandraProcessor {
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.build();
+ public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new
PropertyDescriptor.Builder()
+ .name("Max Rows Per Flow File")
+ .description("The maximum number of result rows that will be
included in a single FlowFile. This will allow you to break up very large "
+ + "result sets into multiple FlowFiles. If the value
specified is zero, then all rows are returned in a single FlowFile.")
+ .defaultValue("0")
+ .required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.INTEGER_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new
PropertyDescriptor.Builder()
+ .name("qdbt-output-batch-size")
+ .displayName("Output Batch Size")
+ .description("The number of output FlowFiles to queue before
committing the process session. When set to zero, the session will be committed
when all result set rows "
+ + "have been processed and the output FlowFiles are ready
for transfer to the downstream relationship. For large result sets, this can
cause a large burst of FlowFiles "
+ + "to be transferred at the end of processor execution. If
this property is set, then when the specified number of FlowFiles are ready for
transfer, then the session will "
+ + "be committed, thus releasing the FlowFiles to the
downstream relationship. NOTE: The maxvalue.* and fragment.count attributes
will not be set on FlowFiles when this "
+ + "property is set.")
+ .defaultValue("0")
+ .required(true)
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
public static final PropertyDescriptor OUTPUT_FORMAT = new
PropertyDescriptor.Builder()
.name("Output Format")
.description("The format to which the result rows will be
converted. If JSON is selected, the output will "
@@ -165,6 +188,8 @@ public class QueryCassandra extends
AbstractCassandraProcessor {
_propertyDescriptors.add(CQL_SELECT_QUERY);
_propertyDescriptors.add(QUERY_TIMEOUT);
_propertyDescriptors.add(FETCH_SIZE);
+ _propertyDescriptors.add(MAX_ROWS_PER_FLOW_FILE);
+ _propertyDescriptors.add(OUTPUT_BATCH_SIZE);
_propertyDescriptors.add(OUTPUT_FORMAT);
_propertyDescriptors.add(TIMESTAMP_FORMAT_PATTERN);
propertyDescriptors =
Collections.unmodifiableList(_propertyDescriptors);
@@ -202,6 +227,7 @@ public class QueryCassandra extends
AbstractCassandraProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
FlowFile fileToProcess = null;
+
if (context.hasIncomingConnection()) {
fileToProcess = session.get();
@@ -217,60 +243,90 @@ public class QueryCassandra extends
AbstractCassandraProcessor {
final String selectQuery =
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
final long queryTimeout =
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS);
final String outputFormat =
context.getProperty(OUTPUT_FORMAT).getValue();
+ final long maxRowsPerFlowFile =
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+ final long outputBatchSize =
context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
final Charset charset =
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue());
final StopWatch stopWatch = new StopWatch(true);
- if (fileToProcess == null) {
- fileToProcess = session.create();
- }
-
try {
// The documentation for the driver recommends the session remain
open the entire time the processor is running
// and states that it is thread-safe. This is why
connectionSession is not in a try-with-resources.
final Session connectionSession = cassandraSession.get();
- final ResultSetFuture queryFuture =
connectionSession.executeAsync(selectQuery);
+ final ResultSet resultSet;
+
+ if (queryTimeout > 0) {
+ resultSet = connectionSession.execute(selectQuery,
queryTimeout, TimeUnit.MILLISECONDS);
+ }else{
+ resultSet = connectionSession.execute(selectQuery);
+ }
final AtomicLong nrOfRows = new AtomicLong(0L);
- fileToProcess = session.write(fileToProcess, new
OutputStreamCallback() {
- @Override
- public void process(final OutputStream rawOut) throws
IOException {
- try (final OutputStream out = new
BufferedOutputStream(rawOut)) {
- logger.debug("Executing CQL query {}", new
Object[]{selectQuery});
- final ResultSet resultSet;
- if (queryTimeout > 0) {
- resultSet =
queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS);
- if (AVRO_FORMAT.equals(outputFormat)) {
- nrOfRows.set(convertToAvroStream(resultSet,
out, queryTimeout, TimeUnit.MILLISECONDS));
- } else if (JSON_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToJsonStream(Optional.of(context), resultSet, out, charset,
queryTimeout, TimeUnit.MILLISECONDS));
- }
- } else {
- resultSet = queryFuture.getUninterruptibly();
- if (AVRO_FORMAT.equals(outputFormat)) {
- nrOfRows.set(convertToAvroStream(resultSet,
out, 0, null));
- } else if (JSON_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToJsonStream(Optional.of(context), resultSet, out, charset,
0, null));
+ long flowFileCount = 0;
+
+ if(fileToProcess == null) {
+ fileToProcess = session.create();
+ }
+
+ while(true) {
+
+ fileToProcess = session.write(fileToProcess, new
OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws
IOException {
+ try {
+ logger.debug("Executing CQL query {}", new
Object[]{selectQuery});
+ if (queryTimeout > 0) {
+ if (AVRO_FORMAT.equals(outputFormat)) {
+
nrOfRows.set(convertToAvroStream(resultSet, maxRowsPerFlowFile,
+ out, queryTimeout,
TimeUnit.MILLISECONDS));
+ } else if (JSON_FORMAT.equals(outputFormat)) {
+
nrOfRows.set(convertToJsonStream(resultSet, maxRowsPerFlowFile,
+ out, charset, queryTimeout,
TimeUnit.MILLISECONDS));
+ }
+ } else {
+ if (AVRO_FORMAT.equals(outputFormat)) {
+
nrOfRows.set(convertToAvroStream(resultSet, maxRowsPerFlowFile,
+ out, 0, null));
+ } else if (JSON_FORMAT.equals(outputFormat)) {
+
nrOfRows.set(convertToJsonStream(resultSet, maxRowsPerFlowFile,
+ out, charset, 0, null));
+ }
}
+ } catch (final TimeoutException | InterruptedException
| ExecutionException e) {
+ throw new ProcessException(e);
}
-
- } catch (final TimeoutException | InterruptedException |
ExecutionException e) {
- throw new ProcessException(e);
}
- }
- });
+ });
- // set attribute how many rows were selected
- fileToProcess = session.putAttribute(fileToProcess,
RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
+ // set attribute how many rows were selected
+ fileToProcess = session.putAttribute(fileToProcess,
RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
- // set mime.type based on output format
- fileToProcess = session.putAttribute(fileToProcess,
CoreAttributes.MIME_TYPE.key(),
- JSON_FORMAT.equals(outputFormat) ? "application/json" :
"application/avro-binary");
+ // set mime.type based on output format
+ fileToProcess = session.putAttribute(fileToProcess,
CoreAttributes.MIME_TYPE.key(),
+ JSON_FORMAT.equals(outputFormat) ? "application/json"
: "application/avro-binary");
- logger.info("{} contains {} Avro records; transferring to
'success'",
- new Object[]{fileToProcess, nrOfRows.get()});
- session.getProvenanceReporter().modifyContent(fileToProcess,
"Retrieved " + nrOfRows.get() + " rows",
- stopWatch.getElapsed(TimeUnit.MILLISECONDS));
- session.transfer(fileToProcess, REL_SUCCESS);
+ if (logger.isDebugEnabled()) {
+ logger.info("{} contains {} records; transferring to
'success'",
+ new Object[]{fileToProcess, nrOfRows.get()});
+ }
+ session.getProvenanceReporter().modifyContent(fileToProcess,
"Retrieved " + nrOfRows.get() + " rows",
+ stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+ session.transfer(fileToProcess, REL_SUCCESS);
+
+ if (outputBatchSize > 0) {
+ flowFileCount++;
+
+ if (flowFileCount == outputBatchSize) {
+ session.commitAsync();
+ flowFileCount = 0;
+// fileToProcess = session.create();
+ }
+ }
+ resultSet.fetchMoreResults().get();
+ if (resultSet.isExhausted()) {
+ break;
+ }
+ fileToProcess = session.create();
+ }
} catch (final NoHostAvailableException nhae) {
getLogger().error("No host in the Cassandra cluster can be
contacted successfully to execute this query", nhae);
@@ -279,11 +335,16 @@ public class QueryCassandra extends
AbstractCassandraProcessor {
// cap the error limit at 10, format the messages, and don't
include the stack trace (it is displayed by the
// logger message above).
getLogger().error(nhae.getCustomMessage(10, true, false));
+ if (fileToProcess == null) {
+ fileToProcess = session.create();
+ }
fileToProcess = session.penalize(fileToProcess);
session.transfer(fileToProcess, REL_RETRY);
-
} catch (final QueryExecutionException qee) {
logger.error("Cannot execute the query with the requested
consistency level successfully", qee);
+ if (fileToProcess == null) {
+ fileToProcess = session.create();
+ }
fileToProcess = session.penalize(fileToProcess);
session.transfer(fileToProcess, REL_RETRY);
@@ -291,29 +352,64 @@ public class QueryCassandra extends
AbstractCassandraProcessor {
if (context.hasIncomingConnection()) {
logger.error("The CQL query {} is invalid due to syntax error,
authorization issue, or another "
+ "validation problem; routing {} to failure",
- new Object[]{selectQuery, fileToProcess}, qve);
+ selectQuery, fileToProcess, qve);
+
+ if (fileToProcess == null) {
+ fileToProcess = session.create();
+ }
fileToProcess = session.penalize(fileToProcess);
session.transfer(fileToProcess, REL_FAILURE);
} else {
// This can happen if any exceptions occur while setting up
the connection, statement, etc.
logger.error("The CQL query {} is invalid due to syntax error,
authorization issue, or another "
- + "validation problem", new Object[]{selectQuery},
qve);
- session.remove(fileToProcess);
+ + "validation problem", selectQuery, qve);
+ if (fileToProcess != null) {
+ session.remove(fileToProcess);
+ }
+ context.yield();
+ }
+ } catch (InterruptedException|ExecutionException ex) {
+ if (context.hasIncomingConnection()) {
+ logger.error("The CQL query {} has yielded an unknown error,
routing {} to failure",
+ selectQuery, fileToProcess, ex);
+
+ if (fileToProcess == null) {
+ fileToProcess = session.create();
+ }
+ fileToProcess = session.penalize(fileToProcess);
+ session.transfer(fileToProcess, REL_FAILURE);
+ } else {
+ // This can happen if any exceptions occur while setting up
the connection, statement, etc.
+ logger.error("The CQL query {} has run into an unknown
error.", selectQuery, ex);
+ if (fileToProcess != null) {
+ session.remove(fileToProcess);
+ }
context.yield();
}
} catch (final ProcessException e) {
if (context.hasIncomingConnection()) {
logger.error("Unable to execute CQL select query {} for {} due
to {}; routing to failure",
- new Object[]{selectQuery, fileToProcess, e});
+ selectQuery, fileToProcess, e);
+ if (fileToProcess == null) {
+ fileToProcess = session.create();
+ }
fileToProcess = session.penalize(fileToProcess);
session.transfer(fileToProcess, REL_FAILURE);
+
} else {
logger.error("Unable to execute CQL select query {} due to {}",
- new Object[]{selectQuery, e});
- session.remove(fileToProcess);
+ selectQuery, e);
+ if (fileToProcess != null) {
+ session.remove(fileToProcess);
+ }
context.yield();
}
}
+ session.commitAsync();
+ }
+
+ private void handleException() {
+
}
@@ -340,54 +436,90 @@ public class QueryCassandra extends
AbstractCassandraProcessor {
* @throws TimeoutException If a result set fetch has taken longer
than the specified timeout
* @throws ExecutionException If any error occurs during the result set
fetch
*/
- public static long convertToAvroStream(final ResultSet rs, final
OutputStream outStream,
+ public static long convertToAvroStream(final ResultSet rs, long
maxRowsPerFlowFile,
+ final OutputStream outStream,
long timeout, TimeUnit timeUnit)
throws IOException, InterruptedException, TimeoutException,
ExecutionException {
final Schema schema = createSchema(rs);
final GenericRecord rec = new GenericData.Record(schema);
-
final DatumWriter<GenericRecord> datumWriter = new
GenericDatumWriter<>(schema);
+
try (final DataFileWriter<GenericRecord> dataFileWriter = new
DataFileWriter<>(datumWriter)) {
dataFileWriter.create(schema, outStream);
- final ColumnDefinitions columnDefinitions =
rs.getColumnDefinitions();
+ ColumnDefinitions columnDefinitions = rs.getColumnDefinitions();
long nrOfRows = 0;
+ long rowsAvailableWithoutFetching =
rs.getAvailableWithoutFetching();
+
if (columnDefinitions != null) {
- do {
-
- // Grab the ones we have
- int rowsAvailableWithoutFetching =
rs.getAvailableWithoutFetching();
- if (rowsAvailableWithoutFetching == 0) {
- // Get more
- if (timeout <= 0 || timeUnit == null) {
- rs.fetchMoreResults().get();
- } else {
- rs.fetchMoreResults().get(timeout, timeUnit);
- }
+
+ // Grab the ones we have
+ if (rowsAvailableWithoutFetching == 0
+ || rowsAvailableWithoutFetching < maxRowsPerFlowFile) {
+ // Get more
+ if (timeout <= 0 || timeUnit == null) {
+ rs.fetchMoreResults().get();
+ } else {
+ rs.fetchMoreResults().get(timeout, timeUnit);
}
+ rowsAvailableWithoutFetching =
rs.getAvailableWithoutFetching();
+ }
- for (Row row : rs) {
+ if(maxRowsPerFlowFile == 0){
+ maxRowsPerFlowFile = rowsAvailableWithoutFetching;
+ }
- for (int i = 0; i < columnDefinitions.size(); i++) {
- final DataType dataType =
columnDefinitions.getType(i);
+ Row row;
+ //Iterator<Row> it = rs.iterator();
+ while(nrOfRows < maxRowsPerFlowFile){
+ try {
+ row = rs.iterator().next();
+ }catch (NoSuchElementException nsee){
+ nrOfRows -= 1;
+ break;
+ }
- if (row.isNull(i)) {
- rec.put(i, null);
- } else {
- rec.put(i, getCassandraObject(row, i,
dataType));
- }
- }
- dataFileWriter.append(rec);
- nrOfRows += 1;
+ // iterator().next() is like iterator().one() => return
null on end
+ //
https://docs.datastax.com/en/drivers/java/2.0/com/datastax/driver/core/ResultSet.html#one--
+ if(row == null){
+ break;
+ }
+
+ for (int i = 0; i < columnDefinitions.size(); i++) {
+ final DataType dataType = columnDefinitions.getType(i);
+ if (row.isNull(i)) {
+ rec.put(i, null);
+ } else {
+ rec.put(i, getCassandraObject(row, i, dataType));
+ }
}
- } while (!rs.isFullyFetched());
+
+ dataFileWriter.append(rec);
+ nrOfRows += 1;
+ }
}
return nrOfRows;
}
}
+ private static String getFormattedDate(final Optional<ProcessContext>
context, Date value) {
+ final String dateFormatPattern = context
+ .map(_context ->
_context.getProperty(TIMESTAMP_FORMAT_PATTERN).getValue())
+ .orElse(TIMESTAMP_FORMAT_PATTERN.getDefaultValue());
+ SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatPattern);
+ dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+ return dateFormat.format(value);
+ }
+
+ public static long convertToJsonStream(final ResultSet rs, long
maxRowsPerFlowFile,
+ final OutputStream outStream,
+ Charset charset, long timeout,
TimeUnit timeUnit)
+ throws IOException, InterruptedException, TimeoutException,
ExecutionException {
+ return convertToJsonStream(Optional.empty(), rs, maxRowsPerFlowFile,
outStream, charset, timeout, timeUnit);
+ }
+
/**
* Converts a result set into an Json object and writes it to the given
stream using the specified character set.
*
@@ -401,93 +533,108 @@ public class QueryCassandra extends
AbstractCassandraProcessor {
* @throws TimeoutException If a result set fetch has taken longer
than the specified timeout
* @throws ExecutionException If any error occurs during the result set
fetch
*/
- public static long convertToJsonStream(final ResultSet rs, final
OutputStream outStream,
- Charset charset, long timeout,
TimeUnit timeUnit)
- throws IOException, InterruptedException, TimeoutException,
ExecutionException {
- return convertToJsonStream(Optional.empty(), rs, outStream, charset,
timeout, timeUnit);
- }
-
@VisibleForTesting
- static long convertToJsonStream(final Optional<ProcessContext> context,
final ResultSet rs, final OutputStream outStream,
- Charset charset, long timeout,
TimeUnit timeUnit)
+ public static long convertToJsonStream(final Optional<ProcessContext>
context,
+ final ResultSet rs, long
maxRowsPerFlowFile,
+ final OutputStream outStream,
+ Charset charset, long timeout, TimeUnit
timeUnit)
throws IOException, InterruptedException, TimeoutException,
ExecutionException {
try {
// Write the initial object brace
outStream.write("{\"results\":[".getBytes(charset));
- final ColumnDefinitions columnDefinitions =
rs.getColumnDefinitions();
+ ColumnDefinitions columnDefinitions = rs.getColumnDefinitions();
long nrOfRows = 0;
+ long rowsAvailableWithoutFetching =
rs.getAvailableWithoutFetching();
+
if (columnDefinitions != null) {
- do {
-
- // Grab the ones we have
- int rowsAvailableWithoutFetching =
rs.getAvailableWithoutFetching();
- if (rowsAvailableWithoutFetching == 0) {
- // Get more
- if (timeout <= 0 || timeUnit == null) {
- rs.fetchMoreResults().get();
- } else {
- rs.fetchMoreResults().get(timeout, timeUnit);
- }
+
+ // Grab the ones we have
+ if (rowsAvailableWithoutFetching == 0) {
+ // Get more
+ if (timeout <= 0 || timeUnit == null) {
+ rs.fetchMoreResults().get();
+ } else {
+ rs.fetchMoreResults().get(timeout, timeUnit);
+ }
+ rowsAvailableWithoutFetching =
rs.getAvailableWithoutFetching();
+ }
+
+ if(maxRowsPerFlowFile == 0){
+ maxRowsPerFlowFile = rowsAvailableWithoutFetching;
+ }
+
+ Row row;
+ while(nrOfRows < maxRowsPerFlowFile){
+ try {
+ row = rs.iterator().next();
+ }catch (NoSuchElementException nsee){
+ nrOfRows -= 1;
+ break;
+ }
+
+ // iterator().next() is like iterator().one() => return
null on end
+ //
https://docs.datastax.com/en/drivers/java/2.0/com/datastax/driver/core/ResultSet.html#one--
+ if(row == null){
+ break;
+ }
+
+ if (nrOfRows != 0) {
+ outStream.write(",".getBytes(charset));
}
- for (Row row : rs) {
- if (nrOfRows != 0) {
+ outStream.write("{".getBytes(charset));
+ for (int i = 0; i < columnDefinitions.size(); i++) {
+ final DataType dataType = columnDefinitions.getType(i);
+ final String colName = columnDefinitions.getName(i);
+ if (i != 0) {
outStream.write(",".getBytes(charset));
}
- outStream.write("{".getBytes(charset));
- for (int i = 0; i < columnDefinitions.size(); i++) {
- final DataType dataType =
columnDefinitions.getType(i);
- final String colName =
columnDefinitions.getName(i);
- if (i != 0) {
- outStream.write(",".getBytes(charset));
- }
- if (row.isNull(i)) {
- outStream.write(("\"" + colName + "\"" +
":null").getBytes(charset));
- } else {
- Object value = getCassandraObject(row, i,
dataType);
- String valueString;
- if (value instanceof List || value instanceof
Set) {
- boolean first = true;
- StringBuilder sb = new StringBuilder("[");
- for (Object element : ((Collection)
value)) {
- if (!first) {
- sb.append(",");
- }
- sb.append(getJsonElement(context,
element));
- first = false;
+ if (row.isNull(i)) {
+ outStream.write(("\"" + colName + "\"" +
":null").getBytes(charset));
+ } else {
+ Object value = getCassandraObject(row, i,
dataType);
+ String valueString;
+ if (value instanceof List || value instanceof Set)
{
+ boolean first = true;
+ StringBuilder sb = new StringBuilder("[");
+ for (Object element : ((Collection) value)) {
+ if (!first) {
+ sb.append(",");
}
- sb.append("]");
- valueString = sb.toString();
- } else if (value instanceof Map) {
- boolean first = true;
- StringBuilder sb = new StringBuilder("{");
- for (Object element : ((Map)
value).entrySet()) {
- Map.Entry entry = (Map.Entry) element;
- Object mapKey = entry.getKey();
- Object mapValue = entry.getValue();
-
- if (!first) {
- sb.append(",");
- }
- sb.append(getJsonElement(context,
mapKey));
- sb.append(":");
- sb.append(getJsonElement(context,
mapValue));
- first = false;
+ sb.append(getJsonElement(context,
element));
+ first = false;
+ }
+ sb.append("]");
+ valueString = sb.toString();
+ } else if (value instanceof Map) {
+ boolean first = true;
+ StringBuilder sb = new StringBuilder("{");
+ for (Object element : ((Map)
value).entrySet()) {
+ Map.Entry entry = (Map.Entry) element;
+ Object mapKey = entry.getKey();
+ Object mapValue = entry.getValue();
+
+ if (!first) {
+ sb.append(",");
}
- sb.append("}");
- valueString = sb.toString();
- } else {
- valueString = getJsonElement(context,
value);
+ sb.append(getJsonElement(context, mapKey));
+ sb.append(":");
+ sb.append(getJsonElement(context,
mapValue));
+ first = false;
}
- outStream.write(("\"" + colName + "\":"
- + valueString + "").getBytes(charset));
+ sb.append("}");
+ valueString = sb.toString();
+ } else {
+ valueString = getJsonElement(context, value);
}
+ outStream.write(("\"" + colName + "\":"
+ + valueString + "").getBytes(charset));
}
- nrOfRows += 1;
- outStream.write("}".getBytes(charset));
}
- } while (!rs.isFullyFetched());
+ nrOfRows += 1;
+ outStream.write("}".getBytes(charset));
+ }
}
return nrOfRows;
} finally {
@@ -511,15 +658,6 @@ public class QueryCassandra extends
AbstractCassandraProcessor {
}
}
- private static String getFormattedDate(final Optional<ProcessContext>
context, Date value) {
- final String dateFormatPattern = context
- .map(_context ->
_context.getProperty(TIMESTAMP_FORMAT_PATTERN).getValue())
- .orElse(TIMESTAMP_FORMAT_PATTERN.getDefaultValue());
- SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatPattern);
- dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
- return dateFormat.format(value);
- }
-
/**
* Creates an Avro schema from the given result set. The metadata (column
definitions, data types, etc.) is used
* to determine a schema for Avro.
@@ -577,4 +715,4 @@ public class QueryCassandra extends
AbstractCassandraProcessor {
}
return builder.endRecord();
}
-}
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java
index d5e5a087c3..e31627aa0b 100644
---
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java
+++
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java
@@ -22,19 +22,20 @@ import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.google.common.collect.Sets;
import com.google.common.reflect.TypeToken;
+import com.google.common.util.concurrent.ListenableFuture;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.text.SimpleDateFormat;
+import java.util.Set;
+import java.util.Map;
+import java.util.List;
+import java.util.HashMap;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.GregorianCalendar;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
import java.util.TimeZone;
import static org.mockito.ArgumentMatchers.any;
@@ -56,7 +57,7 @@ public class CassandraQueryTestUtil {
TEST_DATE = c.getTime();
}
- public static ResultSet createMockResultSet() throws Exception {
+ public static ResultSet createMockResultSet(boolean falseThenTrue) throws
Exception {
ResultSet resultSet = mock(ResultSet.class);
ColumnDefinitions columnDefinitions = mock(ColumnDefinitions.class);
when(columnDefinitions.size()).thenReturn(9);
@@ -106,14 +107,28 @@ public class CassandraQueryTestUtil {
}}, true, 3.0f, 4.0)
);
+ ListenableFuture future = mock(ListenableFuture.class);
+ when(future.get()).thenReturn(rows);
+ when(resultSet.fetchMoreResults()).thenReturn(future);
+
when(resultSet.iterator()).thenReturn(rows.iterator());
when(resultSet.all()).thenReturn(rows);
when(resultSet.getAvailableWithoutFetching()).thenReturn(rows.size());
when(resultSet.isFullyFetched()).thenReturn(false).thenReturn(true);
+ if(falseThenTrue) {
+ when(resultSet.isExhausted()).thenReturn(false, true);
+ }else{
+ when(resultSet.isExhausted()).thenReturn(true);
+ }
when(resultSet.getColumnDefinitions()).thenReturn(columnDefinitions);
+
return resultSet;
}
+ public static ResultSet createMockResultSet() throws Exception {
+ return createMockResultSet(true);
+ }
+
public static ResultSet createMockResultSetOneColumn() throws Exception {
ResultSet resultSet = mock(ResultSet.class);
ColumnDefinitions columnDefinitions = mock(ColumnDefinitions.class);
@@ -143,10 +158,15 @@ public class CassandraQueryTestUtil {
createRow("user2")
);
+ ListenableFuture future = mock(ListenableFuture.class);
+ when(future.get()).thenReturn(rows);
+ when(resultSet.fetchMoreResults()).thenReturn(future);
+
when(resultSet.iterator()).thenReturn(rows.iterator());
when(resultSet.all()).thenReturn(rows);
when(resultSet.getAvailableWithoutFetching()).thenReturn(rows.size());
when(resultSet.isFullyFetched()).thenReturn(false).thenReturn(true);
+ when(resultSet.isExhausted()).thenReturn(false).thenReturn(true);
when(resultSet.getColumnDefinitions()).thenReturn(columnDefinitions);
return resultSet;
}
@@ -195,3 +215,4 @@ public class CassandraQueryTestUtil {
return row;
}
}
+
diff --git
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraIT.java
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraIT.java
new file mode 100644
index 0000000000..2bfd21f6b6
--- /dev/null
+++
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraIT.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.querybuilder.Select;
+import com.datastax.driver.core.querybuilder.Truncate;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.CassandraContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Testcontainers
+public class QueryCassandraIT {
+ @Container
+ private static final CassandraContainer CASSANDRA_CONTAINER = new
CassandraContainer(DockerImageName.parse("cassandra:4.1"));
+
+ private static TestRunner putCassandraTestRunner;
+ private static TestRunner queryCassandraTestRunner;
+ private static MockRecordParser recordReader;
+
+ private static Cluster cluster;
+ private static Session session;
+
+ private static final int LOAD_FLOW_FILE_SIZE = 100;
+ private static final int LOAD_FLOW_FILE_BATCH_SIZE = 10;
+
+ private static final String KEYSPACE = "sample_keyspace";
+ private static final String TABLE = "sample_table";
+
+ @BeforeAll
+ public static void setup() throws InitializationException {
+ recordReader = new MockRecordParser();
+ putCassandraTestRunner =
TestRunners.newTestRunner(PutCassandraRecord.class);
+ queryCassandraTestRunner =
TestRunners.newTestRunner(QueryCassandra.class);
+
+ InetSocketAddress contactPoint = CASSANDRA_CONTAINER.getContactPoint();
+
putCassandraTestRunner.setProperty(PutCassandraRecord.RECORD_READER_FACTORY,
"reader");
+ putCassandraTestRunner.setProperty(PutCassandraRecord.CONTACT_POINTS,
contactPoint.getHostString() + ":" + contactPoint.getPort());
+ putCassandraTestRunner.setProperty(PutCassandraRecord.KEYSPACE,
KEYSPACE);
+ putCassandraTestRunner.setProperty(PutCassandraRecord.TABLE, TABLE);
+
putCassandraTestRunner.setProperty(PutCassandraRecord.CONSISTENCY_LEVEL,
"SERIAL");
+
putCassandraTestRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE,
"LOGGED");
+ putCassandraTestRunner.addControllerService("reader", recordReader);
+ putCassandraTestRunner.enableControllerService(recordReader);
+
+ queryCassandraTestRunner.setProperty(QueryCassandra.CONTACT_POINTS,
contactPoint.getHostName() + ":" + contactPoint.getPort());
+ queryCassandraTestRunner.setProperty(QueryCassandra.FETCH_SIZE, "10");
+ queryCassandraTestRunner.setProperty(QueryCassandra.OUTPUT_BATCH_SIZE,
"10");
+ queryCassandraTestRunner.setProperty(QueryCassandra.KEYSPACE,
KEYSPACE);
+ queryCassandraTestRunner.setProperty(QueryCassandra.CQL_SELECT_QUERY,
"select * from " + TABLE + ";");
+
+ cluster = Cluster.builder().addContactPoint(contactPoint.getHostName())
+ .withPort(contactPoint.getPort()).build();
+ session = cluster.connect();
+
+ String createKeyspace = "CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE +
" WITH replication = {'class':'SimpleStrategy','replication_factor':1};";
+ String createTable = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + "." +
TABLE + "(id int PRIMARY KEY, uuid text, age int);";
+
+ session.execute(createKeyspace);
+ session.execute(createTable);
+ loadData();
+ }
+
+ private static void loadData() {
+ recordReader.addSchemaField("id", RecordFieldType.INT);
+ recordReader.addSchemaField("uuid", RecordFieldType.STRING);
+ recordReader.addSchemaField("age", RecordFieldType.INT);
+ int recordCount = 0;
+
+ for (int i = 0; i<LOAD_FLOW_FILE_SIZE; i++) {
+ for (int j = 0; j<LOAD_FLOW_FILE_BATCH_SIZE; j++) {
+ recordCount++;
+ recordReader.addRecord(recordCount,
UUID.randomUUID().toString(),
+ ThreadLocalRandom.current().nextInt(0, 101));
+ }
+ putCassandraTestRunner.enqueue("");
+ putCassandraTestRunner.run();
+ }
+
putCassandraTestRunner.assertAllFlowFilesTransferred(PutCassandraRecord.REL_SUCCESS,
LOAD_FLOW_FILE_SIZE);
+ assertEquals(LOAD_FLOW_FILE_SIZE*LOAD_FLOW_FILE_BATCH_SIZE,
getRecordsCount());
+ }
+
+ @Test
+ public void testSimpleQuery() {
+ queryCassandraTestRunner.enqueue("");
+ queryCassandraTestRunner.run();
+ Assertions.assertEquals(LOAD_FLOW_FILE_SIZE,
queryCassandraTestRunner.getFlowFilesForRelationship(QueryCassandra.REL_SUCCESS).size());
+ queryCassandraTestRunner.clearTransferState();
+ }
+
+ @Test
+ public void testWithoutBatchSize() {
+
queryCassandraTestRunner.removeProperty(QueryCassandra.OUTPUT_BATCH_SIZE);
+ queryCassandraTestRunner.enqueue("");
+ queryCassandraTestRunner.run();
+ Assertions.assertEquals(LOAD_FLOW_FILE_SIZE,
queryCassandraTestRunner.getFlowFilesForRelationship(QueryCassandra.REL_SUCCESS).size());
+ queryCassandraTestRunner.clearTransferState();
+ }
+
+ private static int getRecordsCount() {
+ Select selectQuery = QueryBuilder.select().all().from(KEYSPACE, TABLE);
+ ResultSet result = session.execute(selectQuery);
+
+ List<Integer> resultsList = result.all()
+ .stream()
+ .map(r -> r.getInt(0))
+ .collect(Collectors.toList());
+
+ return resultsList.size();
+ }
+
+ private void dropRecords() {
+ Truncate query = QueryBuilder.truncate(KEYSPACE, TABLE);
+ session.execute(query);
+ }
+
+ @AfterAll
+ public static void shutdown() {
+ String dropKeyspace = "DROP KEYSPACE " + KEYSPACE;
+ String dropTable = "DROP TABLE IF EXISTS " + KEYSPACE + "." + TABLE;
+
+ session.execute(dropTable);
+ session.execute(dropKeyspace);
+
+ session.close();
+ cluster.close();
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
index 9e329b0f7b..36cdc3af67 100644
---
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
+++
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
@@ -165,6 +165,7 @@ public class QueryCassandraTest {
@Test
public void testProcessorELConfigJsonOutput() {
+ setUpStandardProcessorConfig();
testRunner.setProperty(AbstractCassandraProcessor.CONTACT_POINTS,
"${hosts}");
testRunner.setProperty(QueryCassandra.CQL_SELECT_QUERY, "${query}");
testRunner.setProperty(AbstractCassandraProcessor.PASSWORD, "${pass}");
@@ -172,6 +173,7 @@ public class QueryCassandraTest {
testRunner.setProperty(AbstractCassandraProcessor.CHARSET,
"${charset}");
testRunner.setProperty(QueryCassandra.QUERY_TIMEOUT, "${timeout}");
testRunner.setProperty(QueryCassandra.FETCH_SIZE, "${fetch}");
+ testRunner.setProperty(QueryCassandra.MAX_ROWS_PER_FLOW_FILE,
"${max-rows-per-flow}");
testRunner.setIncomingConnection(false);
testRunner.assertValid();
@@ -181,6 +183,7 @@ public class QueryCassandraTest {
testRunner.setVariable("charset", "UTF-8");
testRunner.setVariable("timeout", "30 sec");
testRunner.setVariable("fetch", "0");
+ testRunner.setVariable("max-rows-per-flow", "0");
// Test JSON output
testRunner.setProperty(QueryCassandra.OUTPUT_FORMAT,
QueryCassandra.JSON_FORMAT);
@@ -216,7 +219,7 @@ public class QueryCassandraTest {
}
@Test
- public void testProcessorEmptyFlowFileAndExceptions() {
+ public void testProcessorEmptyFlowFile() {
setUpStandardProcessorConfig();
// Run with empty flowfile
@@ -224,36 +227,76 @@ public class QueryCassandraTest {
processor.setExceptionToThrow(null);
testRunner.enqueue("".getBytes());
testRunner.run(1, true, true);
- testRunner.assertAllFlowFilesTransferred(QueryCassandra.REL_SUCCESS,
1);
+ testRunner.assertTransferCount(QueryCassandra.REL_SUCCESS, 1);
+ testRunner.clearTransferState();
+ }
+
+ @Test
+ public void testProcessorEmptyFlowFileMaxRowsPerFlowFileEqOne() {
+
+ processor = new MockQueryCassandraTwoRounds();
+ testRunner = TestRunners.newTestRunner(processor);
+
+ setUpStandardProcessorConfig();
+
+ testRunner.setIncomingConnection(true);
+ testRunner.setProperty(QueryCassandra.MAX_ROWS_PER_FLOW_FILE, "1");
+ processor.setExceptionToThrow(null);
+ testRunner.enqueue("".getBytes());
+ testRunner.run(1, true, true);
+ testRunner.assertTransferCount(QueryCassandra.REL_SUCCESS, 2);
testRunner.clearTransferState();
+ }
+
+
+ @Test
+ public void testProcessorEmptyFlowFileAndNoHostAvailableException() {
+ setUpStandardProcessorConfig();
// Test exceptions
processor.setExceptionToThrow(new NoHostAvailableException(new
HashMap<EndPoint, Throwable>()));
testRunner.enqueue("".getBytes());
testRunner.run(1, true, true);
- testRunner.assertAllFlowFilesTransferred(QueryCassandra.REL_RETRY, 1);
+ testRunner.assertTransferCount(QueryCassandra.REL_RETRY, 1);
testRunner.clearTransferState();
+ }
+
+ @Test
+ public void
testProcessorEmptyFlowFileAndInetSocketAddressConsistencyLevelANY() {
+ setUpStandardProcessorConfig();
processor.setExceptionToThrow(
new ReadTimeoutException(new SniEndPoint(new
InetSocketAddress("localhost", 9042), ""), ConsistencyLevel.ANY, 0, 1, false));
testRunner.enqueue("".getBytes());
testRunner.run(1, true, true);
- testRunner.assertAllFlowFilesTransferred(QueryCassandra.REL_RETRY, 1);
+ testRunner.assertTransferCount(QueryCassandra.REL_RETRY, 1);
testRunner.clearTransferState();
+ }
+
+ @Test
+ public void testProcessorEmptyFlowFileAndInetSocketAddressDefault() {
+ setUpStandardProcessorConfig();
processor.setExceptionToThrow(
new InvalidQueryException(new SniEndPoint(new
InetSocketAddress("localhost", 9042), ""), "invalid query"));
testRunner.enqueue("".getBytes());
testRunner.run(1, true, true);
- testRunner.assertAllFlowFilesTransferred(QueryCassandra.REL_FAILURE,
1);
+ testRunner.assertTransferCount(QueryCassandra.REL_FAILURE, 1);
testRunner.clearTransferState();
+ }
+
+ @Test
+ public void testProcessorEmptyFlowFileAndExceptionsProcessException() {
+ setUpStandardProcessorConfig();
processor.setExceptionToThrow(new ProcessException());
testRunner.enqueue("".getBytes());
testRunner.run(1, true, true);
- testRunner.assertAllFlowFilesTransferred(QueryCassandra.REL_FAILURE,
1);
+ testRunner.assertTransferCount(QueryCassandra.REL_FAILURE, 1);
}
+ // --
+
@Test
public void testCreateSchemaOneColumn() throws Exception {
ResultSet rs = CassandraQueryTestUtil.createMockResultSetOneColumn();
@@ -264,7 +307,7 @@ public class QueryCassandraTest {
@Test
public void testCreateSchema() throws Exception {
- ResultSet rs = CassandraQueryTestUtil.createMockResultSet();
+ ResultSet rs = CassandraQueryTestUtil.createMockResultSet(true);
Schema schema = QueryCassandra.createSchema(rs);
assertNotNull(schema);
assertEquals(Schema.Type.RECORD, schema.getType());
@@ -369,17 +412,20 @@ public class QueryCassandraTest {
@Test
public void testConvertToAvroStream() throws Exception {
+ setUpStandardProcessorConfig();
ResultSet rs = CassandraQueryTestUtil.createMockResultSet();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- long numberOfRows = QueryCassandra.convertToAvroStream(rs, baos, 0,
null);
+ long numberOfRows = QueryCassandra.convertToAvroStream(rs, 0, baos, 0,
null);
assertEquals(2, numberOfRows);
}
@Test
public void testConvertToJSONStream() throws Exception {
+ setUpStandardProcessorConfig();
ResultSet rs = CassandraQueryTestUtil.createMockResultSet();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- long numberOfRows = QueryCassandra.convertToJsonStream(rs, baos,
StandardCharsets.UTF_8, 0, null);
+ long numberOfRows = QueryCassandra.convertToJsonStream(rs, 0, baos,
StandardCharsets.UTF_8,
+ 0, null);
assertEquals(2, numberOfRows);
}
@@ -391,7 +437,7 @@ public class QueryCassandraTest {
DateFormat df = new
SimpleDateFormat(QueryCassandra.TIMESTAMP_FORMAT_PATTERN.getDefaultValue());
df.setTimeZone(TimeZone.getTimeZone("UTC"));
- long numberOfRows =
QueryCassandra.convertToJsonStream(Optional.of(testRunner.getProcessContext()),
rs, baos,
+ long numberOfRows =
QueryCassandra.convertToJsonStream(Optional.of(testRunner.getProcessContext()),
rs, 0, baos,
StandardCharsets.UTF_8, 0, null);
assertEquals(1, numberOfRows);
@@ -411,7 +457,7 @@ public class QueryCassandraTest {
DateFormat df = new SimpleDateFormat(customDateFormat);
df.setTimeZone(TimeZone.getTimeZone("UTC"));
- long numberOfRows =
QueryCassandra.convertToJsonStream(Optional.of(context), rs, baos,
StandardCharsets.UTF_8, 0, null);
+ long numberOfRows =
QueryCassandra.convertToJsonStream(Optional.of(context), rs, 0, baos,
StandardCharsets.UTF_8, 0, null);
assertEquals(1, numberOfRows);
Map<String, List<Map<String, String>>> map = new
ObjectMapper().readValue(baos.toByteArray(), HashMap.class);
@@ -425,6 +471,7 @@ public class QueryCassandraTest {
testRunner.setProperty(QueryCassandra.CQL_SELECT_QUERY, "select * from
test");
testRunner.setProperty(AbstractCassandraProcessor.PASSWORD,
"password");
testRunner.setProperty(AbstractCassandraProcessor.USERNAME,
"username");
+ testRunner.setProperty(QueryCassandra.MAX_ROWS_PER_FLOW_FILE, "0");
}
/**
@@ -448,17 +495,21 @@ public class QueryCassandraTest {
Configuration config = Configuration.builder().build();
when(mockCluster.getConfiguration()).thenReturn(config);
ResultSetFuture future = mock(ResultSetFuture.class);
- ResultSet rs = CassandraQueryTestUtil.createMockResultSet();
+ ResultSet rs =
CassandraQueryTestUtil.createMockResultSet(false);
when(future.getUninterruptibly()).thenReturn(rs);
+
try {
doReturn(rs).when(future).getUninterruptibly(anyLong(),
any(TimeUnit.class));
} catch (TimeoutException te) {
throw new IllegalArgumentException("Mocked cluster doesn't
time out");
}
+
if (exceptionToThrow != null) {
-
when(mockSession.executeAsync(anyString())).thenThrow(exceptionToThrow);
+ when(mockSession.execute(anyString(), any(),
any())).thenThrow(exceptionToThrow);
+
when(mockSession.execute(anyString())).thenThrow(exceptionToThrow);
} else {
-
when(mockSession.executeAsync(anyString())).thenReturn(future);
+ when(mockSession.execute(anyString(),any(),
any())).thenReturn(rs);
+ when(mockSession.execute(anyString())).thenReturn(rs);
}
} catch (Exception e) {
fail(e.getMessage());
@@ -469,7 +520,52 @@ public class QueryCassandraTest {
public void setExceptionToThrow(Exception e) {
this.exceptionToThrow = e;
}
+ }
+
+ private static class MockQueryCassandraTwoRounds extends
MockQueryCassandra {
+
+ private Exception exceptionToThrow = null;
+
+ @Override
+ protected Cluster createCluster(List<InetSocketAddress> contactPoints,
SSLContext sslContext,
+ String username, String password,
String compressionType) {
+ Cluster mockCluster = mock(Cluster.class);
+ try {
+ Metadata mockMetadata = mock(Metadata.class);
+ when(mockMetadata.getClusterName()).thenReturn("cluster1");
+ when(mockCluster.getMetadata()).thenReturn(mockMetadata);
+ Session mockSession = mock(Session.class);
+ when(mockCluster.connect()).thenReturn(mockSession);
+ when(mockCluster.connect(anyString())).thenReturn(mockSession);
+ Configuration config = Configuration.builder().build();
+ when(mockCluster.getConfiguration()).thenReturn(config);
+ ResultSetFuture future = mock(ResultSetFuture.class);
+ ResultSet rs =
CassandraQueryTestUtil.createMockResultSet(true);
+ when(future.getUninterruptibly()).thenReturn(rs);
+ try {
+ doReturn(rs).when(future).getUninterruptibly(anyLong(),
any(TimeUnit.class));
+ } catch (TimeoutException te) {
+ throw new IllegalArgumentException("Mocked cluster doesn't
time out");
+ }
+
+ if (exceptionToThrow != null) {
+ when(mockSession.execute(anyString(), any(),
any())).thenThrow(exceptionToThrow);
+
when(mockSession.execute(anyString())).thenThrow(exceptionToThrow);
+ } else {
+ when(mockSession.execute(anyString(),any(),
any())).thenReturn(rs);
+ when(mockSession.execute(anyString())).thenReturn(rs);
+ }
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ return mockCluster;
+ }
+
+ public void setExceptionToThrow(Exception e) {
+ this.exceptionToThrow = e;
+ }
}
}
+