This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 3b67a8a0cd NIFI-13620 Resolved MaxWaitTime Issue in QueryCassandra
(#9232)
3b67a8a0cd is described below
commit 3b67a8a0cd16b152d0104304cb436d7197aa727b
Author: Rahul937303 <[email protected]>
AuthorDate: Fri Sep 6 02:07:43 2024 +0530
NIFI-13620 Resolved MaxWaitTime Issue in QueryCassandra (#9232)
Signed-off-by: David Handermann <[email protected]>
---
.../org/apache/nifi/processors/cassandra/QueryCassandra.java | 9 +++++++--
.../org/apache/nifi/processors/cassandra/QueryCassandraTest.java | 4 ++++
2 files changed, 11 insertions(+), 2 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
index eef2543bae..48af342fb6 100644
---
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
+++
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
@@ -21,6 +21,9 @@ import com.datastax.driver.core.DataType;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.exceptions.QueryExecutionException;
import com.datastax.driver.core.exceptions.QueryValidationException;
@@ -125,7 +128,7 @@ public class QueryCassandra extends
AbstractCassandraProcessor {
.name("Max Wait Time")
.description("The maximum amount of time allowed for a running CQL
select query. Must be of format "
+ "<duration> <TimeUnit> where <duration> is a
non-negative integer and TimeUnit is a supported "
- + "Time Unit, such as: nanos, millis, secs, mins, hrs,
days. A value of zero means there is no limit. ")
+ + "Time Unit, such as: millis, secs, mins, hrs, days. A
value of zero means there is no limit. ")
.defaultValue("0 seconds")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@@ -277,7 +280,9 @@ public class QueryCassandra extends
AbstractCassandraProcessor {
final ResultSet resultSet;
if (queryTimeout > 0) {
- resultSet = connectionSession.execute(selectQuery,
queryTimeout, TimeUnit.MILLISECONDS);
+ Statement statement = new SimpleStatement(selectQuery);
+ statement.setReadTimeoutMillis(Math.toIntExact(queryTimeout));
+ resultSet = connectionSession.execute(statement);
}else{
resultSet = connectionSession.execute(selectQuery);
}
diff --git
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
index c2ad29b7d6..bdf8e36222 100644
---
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
+++
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
@@ -25,9 +25,11 @@ import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SniEndPoint;
+import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.exceptions.InvalidQueryException;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.exceptions.ReadTimeoutException;
+
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema;
import org.apache.nifi.processor.exception.ProcessException;
@@ -540,6 +542,7 @@ public class QueryCassandraTest {
} else {
when(mockSession.execute(anyString(),any(),
any())).thenReturn(rs);
when(mockSession.execute(anyString())).thenReturn(rs);
+
when(mockSession.execute(any(SimpleStatement.class))).thenReturn(rs);
}
} catch (Exception e) {
fail(e.getMessage());
@@ -585,6 +588,7 @@ public class QueryCassandraTest {
} else {
when(mockSession.execute(anyString(),any(),
any())).thenReturn(rs);
when(mockSession.execute(anyString())).thenReturn(rs);
+
when(mockSession.execute(any(SimpleStatement.class))).thenReturn(rs);
}
} catch (Exception e) {
fail(e.getMessage());