This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 3b67a8a0cd NIFI-13620 Resolved MaxWaitTime Issue in QueryCassandra 
(#9232)
3b67a8a0cd is described below

commit 3b67a8a0cd16b152d0104304cb436d7197aa727b
Author: Rahul937303 <[email protected]>
AuthorDate: Fri Sep 6 02:07:43 2024 +0530

    NIFI-13620 Resolved MaxWaitTime Issue in QueryCassandra (#9232)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../org/apache/nifi/processors/cassandra/QueryCassandra.java     | 9 +++++++--
 .../org/apache/nifi/processors/cassandra/QueryCassandraTest.java | 4 ++++
 2 files changed, 11 insertions(+), 2 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
index eef2543bae..48af342fb6 100644
--- 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
@@ -21,6 +21,9 @@ import com.datastax.driver.core.DataType;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+
 import com.datastax.driver.core.exceptions.NoHostAvailableException;
 import com.datastax.driver.core.exceptions.QueryExecutionException;
 import com.datastax.driver.core.exceptions.QueryValidationException;
@@ -125,7 +128,7 @@ public class QueryCassandra extends 
AbstractCassandraProcessor {
             .name("Max Wait Time")
             .description("The maximum amount of time allowed for a running CQL 
select query. Must be of format "
                     + "<duration> <TimeUnit> where <duration> is a 
non-negative integer and TimeUnit is a supported "
-                    + "Time Unit, such as: nanos, millis, secs, mins, hrs, 
days. A value of zero means there is no limit. ")
+                    + "Time Unit, such as: millis, secs, mins, hrs, days. A 
value of zero means there is no limit. ")
             .defaultValue("0 seconds")
             .required(true)
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@@ -277,7 +280,9 @@ public class QueryCassandra extends 
AbstractCassandraProcessor {
             final ResultSet resultSet;
 
             if (queryTimeout > 0) {
-                resultSet = connectionSession.execute(selectQuery, 
queryTimeout, TimeUnit.MILLISECONDS);
+                Statement statement = new SimpleStatement(selectQuery);
+                statement.setReadTimeoutMillis(Math.toIntExact(queryTimeout));
+                resultSet = connectionSession.execute(statement);
             }else{
                 resultSet = connectionSession.execute(selectQuery);
             }
diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
index c2ad29b7d6..bdf8e36222 100644
--- 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
@@ -25,9 +25,11 @@ import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.ResultSetFuture;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.SniEndPoint;
+import com.datastax.driver.core.SimpleStatement;
 import com.datastax.driver.core.exceptions.InvalidQueryException;
 import com.datastax.driver.core.exceptions.NoHostAvailableException;
 import com.datastax.driver.core.exceptions.ReadTimeoutException;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.avro.Schema;
 import org.apache.nifi.processor.exception.ProcessException;
@@ -540,6 +542,7 @@ public class QueryCassandraTest {
                 } else {
                     when(mockSession.execute(anyString(),any(), 
any())).thenReturn(rs);
                     when(mockSession.execute(anyString())).thenReturn(rs);
+                    
when(mockSession.execute(any(SimpleStatement.class))).thenReturn(rs);
                 }
             } catch (Exception e) {
                 fail(e.getMessage());
@@ -585,6 +588,7 @@ public class QueryCassandraTest {
                 } else {
                     when(mockSession.execute(anyString(),any(), 
any())).thenReturn(rs);
                     when(mockSession.execute(anyString())).thenReturn(rs);
+                    
when(mockSession.execute(any(SimpleStatement.class))).thenReturn(rs);
                 }
             } catch (Exception e) {
                 fail(e.getMessage());

Reply via email to