Repository: nifi
Updated Branches:
  refs/heads/master b4810b8dd -> c6572f042


http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
new file mode 100644
index 0000000..04c4c00
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.standard.util.TestJdbcHugeStream;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestExecuteSQLRecord {
+
+    private static final Logger LOGGER;
+
+    static {
+        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
+        System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
+        System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
+        
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.ExecuteSQLRecord",
 "debug");
+        
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestExecuteSQLRecord",
 "debug");
+        LOGGER = LoggerFactory.getLogger(TestExecuteSQLRecord.class);
+    }
+
+    final static String DB_LOCATION = "target/db";
+
+    final static String QUERY_WITH_EL = "select "
+            + "  PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as 
PersonCode"
+            + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as 
ProductCode"
+            + ", REL.ID as RelId,    REL.NAME as RelName,    REL.CODE as 
RelCode"
+            + ", ROW_NUMBER() OVER () as rownr "
+            + " from persons PER, products PRD, relationships REL"
+            + " where PER.ID = ${person.id}";
+
+    final static String QUERY_WITHOUT_EL = "select "
+            + "  PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as 
PersonCode"
+            + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as 
ProductCode"
+            + ", REL.ID as RelId,    REL.NAME as RelName,    REL.CODE as 
RelCode"
+            + ", ROW_NUMBER() OVER () as rownr "
+            + " from persons PER, products PRD, relationships REL"
+            + " where PER.ID = 10";
+
+    final static String QUERY_WITHOUT_EL_WITH_PARAMS = "select "
+            + "  PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as 
PersonCode"
+            + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as 
ProductCode"
+            + ", REL.ID as RelId,    REL.NAME as RelName,    REL.CODE as 
RelCode"
+            + ", ROW_NUMBER() OVER () as rownr "
+            + " from persons PER, products PRD, relationships REL"
+            + " where PER.ID < ? AND REL.ID < ?";
+
+
+    @BeforeClass
+    public static void setupClass() {
+        System.setProperty("derby.stream.error.file", "target/derby.log");
+    }
+
+    private TestRunner runner;
+
+    @Before
+    public void setup() throws InitializationException {
+        final DBCPService dbcp = new DBCPServiceSimpleImpl();
+        final Map<String, String> dbcpProperties = new HashMap<>();
+
+        runner = TestRunners.newTestRunner(ExecuteSQLRecord.class);
+        runner.addControllerService("dbcp", dbcp, dbcpProperties);
+        runner.enableControllerService(dbcp);
+        runner.setProperty(AbstractExecuteSQL.DBCP_SERVICE, "dbcp");
+    }
+
+    @Test
+    public void testIncomingConnectionWithNoFlowFile() throws 
InitializationException {
+        runner.setIncomingConnection(true);
+        runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM 
persons");
+        MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+        runner.addControllerService("writer", recordWriter);
+        runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.enableControllerService(recordWriter);
+        runner.run();
+        runner.assertTransferCount(AbstractExecuteSQL.REL_SUCCESS, 0);
+        runner.assertTransferCount(AbstractExecuteSQL.REL_FAILURE, 0);
+    }
+
+    @Test
+    public void testIncomingConnectionWithNoFlowFileAndNoQuery() throws 
InitializationException {
+        runner.setIncomingConnection(true);
+        MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+        runner.addControllerService("writer", recordWriter);
+        runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.enableControllerService(recordWriter);
+        runner.run();
+        runner.assertTransferCount(AbstractExecuteSQL.REL_SUCCESS, 0);
+        runner.assertTransferCount(AbstractExecuteSQL.REL_FAILURE, 0);
+    }
+
+    @Test(expected = AssertionError.class)
+    public void testNoIncomingConnectionAndNoQuery() throws 
InitializationException {
+        runner.setIncomingConnection(false);
+        runner.run();
+    }
+
+    @Test
+    public void testNoIncomingConnection() throws ClassNotFoundException, 
SQLException, InitializationException, IOException {
+        runner.setIncomingConnection(false);
+        invokeOnTriggerRecords(null, QUERY_WITHOUT_EL, false, null, true);
+        assertEquals(ProvenanceEventType.RECEIVE, 
runner.getProvenanceEvents().get(0).getEventType());
+    }
+
+    @Test
+    public void testSelectQueryInFlowFile() throws InitializationException, 
ClassNotFoundException, SQLException, IOException {
+        invokeOnTriggerRecords(null, QUERY_WITHOUT_EL, true, null, false);
+        assertEquals(ProvenanceEventType.FORK, 
runner.getProvenanceEvents().get(0).getEventType());
+        assertEquals(ProvenanceEventType.FETCH, 
runner.getProvenanceEvents().get(1).getEventType());
+    }
+
+    @Test
+    public void testMaxRowsPerFlowFile() throws Exception {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 
integer, val2 integer, constraint my_pk primary key (id))");
+
+        for (int i = 0; i < 1000; i++) {
+            stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (" 
+ i + ", 1, 1)");
+        }
+
+        runner.setIncomingConnection(false);
+        runner.setProperty(AbstractExecuteSQL.MAX_ROWS_PER_FLOW_FILE, "5");
+        runner.setProperty(AbstractExecuteSQL.OUTPUT_BATCH_SIZE, "0");
+        runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM 
TEST_NULL_INT");
+        MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+        runner.addControllerService("writer", recordWriter);
+        runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.enableControllerService(recordWriter);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(AbstractExecuteSQL.REL_SUCCESS, 
200);
+        runner.assertTransferCount(AbstractExecuteSQL.REL_FAILURE, 0);
+        
runner.assertAllFlowFilesContainAttribute(AbstractExecuteSQL.REL_SUCCESS, 
FragmentAttributes.FRAGMENT_INDEX.key());
+        
runner.assertAllFlowFilesContainAttribute(AbstractExecuteSQL.REL_SUCCESS, 
FragmentAttributes.FRAGMENT_ID.key());
+        
runner.assertAllFlowFilesContainAttribute(AbstractExecuteSQL.REL_SUCCESS, 
FragmentAttributes.FRAGMENT_COUNT.key());
+
+        MockFlowFile firstFlowFile = 
runner.getFlowFilesForRelationship(AbstractExecuteSQL.REL_SUCCESS).get(0);
+
+        
firstFlowFile.assertAttributeEquals(AbstractExecuteSQL.RESULT_ROW_COUNT, "5");
+        firstFlowFile.assertAttributeEquals("record.count", "5");
+        firstFlowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), 
"text/plain"); // MockRecordWriter has text/plain MIME type
+        
firstFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), 
"0");
+        
firstFlowFile.assertAttributeEquals(AbstractExecuteSQL.RESULTSET_INDEX, "0");
+
+        MockFlowFile lastFlowFile = 
runner.getFlowFilesForRelationship(AbstractExecuteSQL.REL_SUCCESS).get(199);
+
+        
lastFlowFile.assertAttributeEquals(AbstractExecuteSQL.RESULT_ROW_COUNT, "5");
+        lastFlowFile.assertAttributeEquals("record.count", "5");
+        lastFlowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), 
"text/plain"); // MockRecordWriter has text/plain MIME type
+        
lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), 
"199");
+        lastFlowFile.assertAttributeEquals(AbstractExecuteSQL.RESULTSET_INDEX, 
"0");
+    }
+
+    @Test
+    public void testInsertStatementCreatesFlowFile() throws Exception {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 
integer, val2 integer, constraint my_pk primary key (id))");
+
+        runner.setIncomingConnection(false);
+        runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, "insert into 
TEST_NULL_INT (id, val1, val2) VALUES (0, NULL, 1)");
+        MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+        runner.addControllerService("writer", recordWriter);
+        runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.enableControllerService(recordWriter);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(AbstractExecuteSQL.REL_SUCCESS, 
1);
+        
runner.getFlowFilesForRelationship(AbstractExecuteSQL.REL_SUCCESS).get(0).assertAttributeEquals(AbstractExecuteSQL.RESULT_ROW_COUNT,
 "0");
+    }
+
+    @Test
+    public void testNoRowsStatementCreatesEmptyFlowFile() throws Exception {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 
integer, val2 integer, constraint my_pk primary key (id))");
+
+        runner.setIncomingConnection(true);
+        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from 
TEST_NULL_INT");
+        MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+        runner.addControllerService("writer", recordWriter);
+        runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.enableControllerService(recordWriter);
+        runner.enqueue("Hello".getBytes());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
+        MockFlowFile firstFlowFile = 
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
+        firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "0");
+        firstFlowFile.assertContentEquals("");
+    }
+
+    @Test
+    public void testWithSqlException() throws Exception {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NO_ROWS");
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST_NO_ROWS (id integer)");
+
+        runner.setIncomingConnection(false);
+        // Try a valid SQL statement that will generate an error (val1 does 
not exist, e.g.)
+        runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, "SELECT val1 
FROM TEST_NO_ROWS");
+        MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+        runner.addControllerService("writer", recordWriter);
+        runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.enableControllerService(recordWriter);
+        runner.run();
+
+        //No incoming flow file containing a query, and an exception causes no 
outbound flowfile.
+        // There should be no flow files on either relationship
+        runner.assertAllFlowFilesTransferred(AbstractExecuteSQL.REL_FAILURE, 
0);
+        runner.assertAllFlowFilesTransferred(AbstractExecuteSQL.REL_SUCCESS, 
0);
+    }
+
+    public void invokeOnTriggerRecords(final Integer queryTimeout, final 
String query, final boolean incomingFlowFile, final Map<String, String> attrs, 
final boolean setQueryProperty)
+            throws InitializationException, ClassNotFoundException, 
SQLException, IOException {
+
+        if (queryTimeout != null) {
+            runner.setProperty(AbstractExecuteSQL.QUERY_TIMEOUT, 
queryTimeout.toString() + " secs");
+        }
+
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        TestJdbcHugeStream.loadTestData2Database(con, 100, 200, 100);
+        LOGGER.info("test data loaded");
+
+        // ResultSet size will be 1x200x100 = 20 000 rows
+        // because of where PER.ID = ${person.id}
+        final int nrOfRows = 20000;
+
+        MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+        runner.addControllerService("writer", recordWriter);
+        runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.enableControllerService(recordWriter);
+
+        if (incomingFlowFile) {
+            // incoming FlowFile content is not used, but attributes are used
+            final Map<String, String> attributes = (attrs == null) ? new 
HashMap<>() : attrs;
+            attributes.put("person.id", "10");
+            if (!setQueryProperty) {
+                runner.enqueue(query.getBytes(), attributes);
+            } else {
+                runner.enqueue("Hello".getBytes(), attributes);
+            }
+        }
+
+        if (setQueryProperty) {
+            runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, query);
+        }
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(AbstractExecuteSQL.REL_SUCCESS, 
1);
+        
runner.assertAllFlowFilesContainAttribute(AbstractExecuteSQL.REL_SUCCESS, 
AbstractExecuteSQL.RESULT_QUERY_DURATION);
+        
runner.assertAllFlowFilesContainAttribute(AbstractExecuteSQL.REL_SUCCESS, 
AbstractExecuteSQL.RESULT_QUERY_EXECUTION_TIME);
+        
runner.assertAllFlowFilesContainAttribute(AbstractExecuteSQL.REL_SUCCESS, 
AbstractExecuteSQL.RESULT_QUERY_FETCH_TIME);
+        
runner.assertAllFlowFilesContainAttribute(AbstractExecuteSQL.REL_SUCCESS, 
AbstractExecuteSQL.RESULT_ROW_COUNT);
+
+        final List<MockFlowFile> flowfiles = 
runner.getFlowFilesForRelationship(AbstractExecuteSQL.REL_SUCCESS);
+        final long executionTime = 
Long.parseLong(flowfiles.get(0).getAttribute(AbstractExecuteSQL.RESULT_QUERY_EXECUTION_TIME));
+        final long fetchTime = 
Long.parseLong(flowfiles.get(0).getAttribute(AbstractExecuteSQL.RESULT_QUERY_FETCH_TIME));
+        final long durationTime = 
Long.parseLong(flowfiles.get(0).getAttribute(AbstractExecuteSQL.RESULT_QUERY_DURATION));
+        assertEquals(durationTime, fetchTime + executionTime);
+    }
+
+
+    /**
+     * Simple implementation only for ExecuteSQL processor testing.
+     */
+    class DBCPServiceSimpleImpl extends AbstractControllerService implements 
DBCPService {
+
+        @Override
+        public String getIdentifier() {
+            return "dbcp";
+        }
+
+        @Override
+        public Connection getConnection() throws ProcessException {
+            try {
+                Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+                final Connection con = 
DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
+                return con;
+            } catch (final Exception e) {
+                throw new ProcessException("getConnection failed: " + e);
+            }
+        }
+    }
+
+}

Reply via email to