Repository: nifi Updated Branches: refs/heads/master 3de0b8edf -> a53a37f9c
NIFI-4319 - Fixed ArrayIndexOutOfBoundsException in QueryCassandra Signed-off-by: Matthew Burgess <[email protected]> This closes #2112 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a53a37f9 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a53a37f9 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a53a37f9 Branch: refs/heads/master Commit: a53a37f9cabab6a3d52018cde88aceebf994c7c3 Parents: 3de0b8e Author: Pierre Villard <[email protected]> Authored: Sat Aug 26 15:51:12 2017 +0200 Committer: Matthew Burgess <[email protected]> Committed: Wed Aug 30 13:52:05 2017 -0400 ---------------------------------------------------------------------- .../processors/cassandra/QueryCassandra.java | 7 ++- .../cassandra/CassandraQueryTestUtil.java | 45 +++++++++++++++++++- .../cassandra/QueryCassandraTest.java | 8 ++++ 3 files changed, 58 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/a53a37f9/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java index 7387334..52eb9e0 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java @@ -40,6 +40,7 @@ import org.apache.nifi.annotation.lifecycle.OnShutdown; import org.apache.nifi.annotation.lifecycle.OnUnscheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.lifecycle.OnScheduled; @@ -263,6 +264,10 @@ public class QueryCassandra extends AbstractCassandraProcessor { // set attribute how many rows were selected fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); + // set mime.type based on output format + fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), + JSON_FORMAT.equals(outputFormat) ? "application/json" : "application/avro-binary"); + logger.info("{} contains {} Avro records; transferring to 'success'", new Object[]{fileToProcess, nrOfRows.get()}); session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " + nrOfRows.get() + " rows", @@ -510,7 +515,7 @@ public class QueryCassandra extends AbstractCassandraProcessor { final int nrOfColumns = (columnDefinitions == null ? 0 : columnDefinitions.size()); String tableName = "NiFi_Cassandra_Query_Record"; if (nrOfColumns > 0) { - String tableNameFromMeta = columnDefinitions.getTable(1); + String tableNameFromMeta = columnDefinitions.getTable(0); if (!StringUtils.isBlank(tableNameFromMeta)) { tableName = tableNameFromMeta; } http://git-wip-us.apache.org/repos/asf/nifi/blob/a53a37f9/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java index 0d5571e..dbe2e1e 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java @@ -61,7 +61,7 @@ public class CassandraQueryTestUtil { } }); - when(columnDefinitions.getTable(1)).thenReturn("users"); + when(columnDefinitions.getTable(0)).thenReturn("users"); when(columnDefinitions.getType(anyInt())).thenAnswer(new Answer<DataType>() { @@ -103,6 +103,43 @@ public class CassandraQueryTestUtil { return resultSet; } + public static ResultSet createMockResultSetOneColumn() throws Exception { + ResultSet resultSet = mock(ResultSet.class); + ColumnDefinitions columnDefinitions = mock(ColumnDefinitions.class); + when(columnDefinitions.size()).thenReturn(1); + when(columnDefinitions.getName(anyInt())).thenAnswer(new Answer<String>() { + List<String> colNames = Arrays.asList("user_id"); + @Override + public String answer(InvocationOnMock invocationOnMock) throws Throwable { + return colNames.get((Integer) invocationOnMock.getArguments()[0]); + + } + }); + + when(columnDefinitions.getTable(0)).thenReturn("users"); + + when(columnDefinitions.getType(anyInt())).thenAnswer(new Answer<DataType>() { + List<DataType> dataTypes = Arrays.asList(DataType.text()); + @Override + public DataType answer(InvocationOnMock invocationOnMock) throws Throwable { + return dataTypes.get((Integer) invocationOnMock.getArguments()[0]); + + } + }); + + List<Row> rows = Arrays.asList( + createRow("user1"), + createRow("user2") + ); + + when(resultSet.iterator()).thenReturn(rows.iterator()); + when(resultSet.all()).thenReturn(rows); + when(resultSet.getAvailableWithoutFetching()).thenReturn(rows.size()); + when(resultSet.isFullyFetched()).thenReturn(false).thenReturn(true); + when(resultSet.getColumnDefinitions()).thenReturn(columnDefinitions); + return resultSet; + } + public static Row createRow(String user_id, String first_name, String last_name, Set<String> emails, List<String> top_places, Map<Date, String> todo, boolean registered, float scale, double metric) { @@ -119,4 +156,10 @@ public class CassandraQueryTestUtil { return row; } + + public static Row createRow(String user_id) { + Row row = mock(Row.class); + when(row.getString(0)).thenReturn(user_id); + return row; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/a53a37f9/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java index 5cd54e9..83110c3 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java @@ -241,6 +241,14 @@ public class QueryCassandraTest { } @Test + public void testCreateSchemaOneColumn() throws Exception { + ResultSet rs = CassandraQueryTestUtil.createMockResultSetOneColumn(); + Schema schema = QueryCassandra.createSchema(rs); + assertNotNull(schema); + assertEquals(schema.getName(), "users"); + } + + @Test public void testCreateSchema() throws Exception { ResultSet rs = CassandraQueryTestUtil.createMockResultSet(); Schema schema = QueryCassandra.createSchema(rs);
