This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new c2fe7f0554 NIFI-15133 Added Reusable Test Database Pool for Standard
Processors
c2fe7f0554 is described below
commit c2fe7f05547ef51e93fada9567466cd4cfa14fd0
Author: exceptionfactory <[email protected]>
AuthorDate: Wed Oct 22 16:27:38 2025 -0500
NIFI-15133 Added Reusable Test Database Pool for Standard Processors
Signed-off-by: Pierre Villard <[email protected]>
This closes #10457.
---
.../EmbeddedDatabaseConnectionService.java | 84 ++++
.../standard/QueryDatabaseTableRecordIT.java | 58 ---
.../standard/QueryDatabaseTableRecordTest.java | 426 +++++++--------------
.../standard/TestListDatabaseTables.java | 233 ++++-------
.../standard/TestUpdateDatabaseTable.java | 133 +++----
5 files changed, 349 insertions(+), 585 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/EmbeddedDatabaseConnectionService.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/EmbeddedDatabaseConnectionService.java
new file mode 100644
index 0000000000..aae8d88de5
--- /dev/null
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/EmbeddedDatabaseConnectionService.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.dbcp.DBCPService;
+
+import java.io.Closeable;
+import java.io.OutputStream;
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Objects;
+import java.util.UUID;
+
+public class EmbeddedDatabaseConnectionService extends
AbstractControllerService implements DBCPService, Closeable {
+
+ private final String connectionUrl;
+
+ private final String shutdownUrl;
+
+ // Error Field for ignoring Derby logs
+ public static final OutputStream ERROR_FIELD;
+
+ static {
+ ERROR_FIELD = new OutputStream() {
+ @Override
+ public void write(final int character) {
+
+ }
+ };
+
+ System.setProperty("derby.stream.error.field",
"org.apache.nifi.processors.standard.EmbeddedDatabaseConnectionService.ERROR_FIELD");
+ }
+
+ public EmbeddedDatabaseConnectionService(final Path databaseLocation) {
+ Objects.requireNonNull(databaseLocation, "Database Location required");
+
+ final Path serviceDirectory =
databaseLocation.resolve(UUID.randomUUID().toString());
+ connectionUrl =
"jdbc:derby:%s;create=true".formatted(serviceDirectory);
+ shutdownUrl =
"jdbc:derby:%s;shutdown=true".formatted(serviceDirectory);
+
+ try {
+ Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+ } catch (final ClassNotFoundException e) {
+ throw new IllegalStateException("Apache Derby EmbeddedDriver not
found", e);
+ }
+ }
+
+ @Override
+ public Connection getConnection() {
+ try {
+ return DriverManager.getConnection(connectionUrl);
+ } catch (final SQLException e) {
+ throw new IllegalStateException("Apache Derby Connection failed
[%s]".formatted(connectionUrl), e);
+ }
+ }
+
+ @OnDisabled
+ @Override
+ public void close() {
+ try {
+ DriverManager.getConnection(shutdownUrl);
+ } catch (final SQLException ignored) {
+
+ }
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordIT.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordIT.java
deleted file mode 100644
index 5c8ad9ed6c..0000000000
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordIT.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard;
-
-import org.apache.nifi.dbcp.DBCPConnectionPool;
-import org.apache.nifi.dbcp.utils.DBCPProperties;
-import org.apache.nifi.reporting.InitializationException;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.testcontainers.postgresql.PostgreSQLContainer;
-
-public class QueryDatabaseTableRecordIT extends QueryDatabaseTableRecordTest {
- private static PostgreSQLContainer postgres;
-
- @BeforeAll
- public static void setupBeforeClass() {
- postgres = new
PostgreSQLContainer("postgres:latest").withInitScript("PutDatabaseRecordIT/create-person-table.sql");
- postgres.start();
- }
-
- @AfterAll
- public static void cleanUpAfterClass() {
- if (postgres != null) {
- postgres.close();
- postgres = null;
- }
- }
-
- @Override
- public String getDatabaseType() {
- return "PostgreSQL";
- }
-
- @Override
- public void createDbcpControllerService() throws InitializationException {
- final DBCPConnectionPool connectionPool = new DBCPConnectionPool();
- runner.addControllerService("dbcp", connectionPool);
- runner.setProperty(connectionPool, DBCPProperties.DATABASE_URL,
postgres.getJdbcUrl());
- runner.setProperty(connectionPool, DBCPProperties.DB_USER,
postgres.getUsername());
- runner.setProperty(connectionPool, DBCPProperties.DB_PASSWORD,
postgres.getPassword());
- runner.setProperty(connectionPool, DBCPProperties.DB_DRIVERNAME,
postgres.getDriverClassName());
- runner.enableControllerService(connectionPool);
- }
-}
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java
index 6e7c35719f..059cbcd28b 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java
@@ -18,8 +18,6 @@ package org.apache.nifi.processors.standard;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.components.state.Scope;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordWriter;
@@ -27,92 +25,62 @@ import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.db.JdbcProperties;
-import org.apache.nifi.util.file.FileUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
-import java.io.File;
import java.io.IOException;
+import java.nio.file.Path;
import java.sql.Connection;
-import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-/**
- * Unit tests for the QueryDatabaseTableRecord processor
- */
-public class QueryDatabaseTableRecordTest {
+class QueryDatabaseTableRecordTest {
private static final DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
- MockQueryDatabaseTableRecord processor;
- protected TestRunner runner;
- private final static String DB_LOCATION = "target/db_qdt";
private final static String TABLE_NAME_KEY = "tableName";
private final static String MAX_ROWS_KEY = "maxRows";
+ private static final String SERVICE_ID =
EmbeddedDatabaseConnectionService.class.getSimpleName();
- @BeforeAll
- public static void setupBeforeClass() {
- System.setProperty("derby.stream.error.file", "target/derby.log");
-
- // remove previous test database, if any
- final File dbLocation = new File(DB_LOCATION);
- try {
- FileUtils.deleteFile(dbLocation, true);
- } catch (IOException ignored) {
- // Do nothing, may not have existed
- }
- }
+ private static EmbeddedDatabaseConnectionService service;
- @AfterAll
- public static void cleanUpAfterClass() {
- try {
- DriverManager.getConnection("jdbc:derby:" + DB_LOCATION +
";shutdown=true");
- } catch (Exception ignored) {
- // Do nothing, this is what happens at Derby shutdown
- }
- // remove previous test database, if any
- final File dbLocation = new File(DB_LOCATION);
- try {
- FileUtils.deleteFile(dbLocation, true);
- } catch (IOException ignored) {
- // Do nothing, may not have existed
- }
- System.clearProperty("derby.stream.error.file");
- }
+ MockQueryDatabaseTableRecord processor;
+ protected TestRunner runner;
- public String getDatabaseType() {
- return "Generic";
+ @BeforeAll
+ static void setService(@TempDir final Path databaseLocation) {
+ service = new EmbeddedDatabaseConnectionService(databaseLocation);
}
- public void createDbcpControllerService() throws InitializationException {
- final DBCPService dbcp = new DBCPServiceSimpleImpl();
- final Map<String, String> dbcpProperties = new HashMap<>();
- runner.addControllerService("dbcp", dbcp, dbcpProperties);
- runner.enableControllerService(dbcp);
+ @AfterAll
+ static void shutdown() {
+ service.close();
}
@BeforeEach
- public void setup() throws InitializationException, IOException {
+ void setRunner() throws InitializationException, IOException {
processor = new MockQueryDatabaseTableRecord();
runner = TestRunners.newTestRunner(processor);
- createDbcpControllerService();
- runner.setProperty(QueryDatabaseTableRecord.DBCP_SERVICE, "dbcp");
- runner.setProperty(QueryDatabaseTableRecord.DB_TYPE,
getDatabaseType());
+
+ runner.addControllerService(SERVICE_ID, service);
+ runner.enableControllerService(service);
+
+ runner.setProperty(QueryDatabaseTableRecord.DBCP_SERVICE, SERVICE_ID);
+ runner.setProperty(QueryDatabaseTableRecord.DB_TYPE, "Generic");
runner.getStateManager().clear(Scope.CLUSTER);
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
runner.addControllerService("writer", recordWriter);
@@ -123,24 +91,40 @@ public class QueryDatabaseTableRecordTest {
}
@AfterEach
- public void teardown() throws IOException {
+ void shutdownRunner() throws IOException {
runner.getStateManager().clear(Scope.CLUSTER);
runner = null;
}
+ @AfterEach
+ void dropTables() {
+ final List<String> tables = List.of(
+ "TEST_QUERY_DB_TABLE",
+ "TEST_QUERY_DB_TABLE2",
+ "TEST_NULL_INT",
+ "TEST_NO_ROWS",
+ "TYPE_LIST"
+ );
+
+ for (final String table : tables) {
+ try (
+ Connection connection = service.getConnection();
+ Statement statement = connection.createStatement()
+ ) {
+ statement.execute("DROP TABLE %s".formatted(table));
+
+ } catch (final SQLException ignored) {
+
+ }
+ }
+ }
+
@Test
public void testAddedRows() throws SQLException, IOException {
-
// load test data to database
- final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
+ final Connection con = service.getConnection();
Statement stmt = con.createStatement();
- try {
- stmt.execute("drop table TEST_QUERY_DB_TABLE");
- } catch (final SQLException ignored) {
- // Ignore this error, probably a "table does not exist" since
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
- }
-
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null,
name varchar(100), scale float, created_on timestamp, bignum bigint default
0)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
@@ -154,14 +138,14 @@ public class QueryDatabaseTableRecordTest {
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 2);
- MockFlowFile flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ MockFlowFile flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
assertEquals("TEST_QUERY_DB_TABLE",
flowFile.getAttribute(QueryDatabaseTableRecord.RESULT_TABLENAME));
- assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+ assertEquals("2", flowFile.getAttribute("maxvalue.id"));
runner.setProperty(QueryDatabaseTableRecord.FETCH_SIZE, "2");
flowFile.assertAttributeEquals("record.count", "2");
flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(1);
- assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+ assertEquals("2", flowFile.getAttribute("maxvalue.id"));
flowFile.assertAttributeEquals("record.count", "1");
runner.clearTransferState();
@@ -177,8 +161,8 @@ public class QueryDatabaseTableRecordTest {
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
- flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
- assertEquals(flowFile.getAttribute("maxvalue.id"), "3");
+ flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
+ assertEquals("3", flowFile.getAttribute("maxvalue.id"));
flowFile.assertAttributeEquals("record.count", "1");
// Sanity check - run again, this time no flowfiles/rows should be
transferred
@@ -194,9 +178,9 @@ public class QueryDatabaseTableRecordTest {
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on) VALUES (4, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
- flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
- assertEquals(flowFile.getAttribute("maxvalue.id"), "4");
- assertEquals(flowFile.getAttribute("maxvalue.created_on"), "2011-01-01
03:23:34.234");
+ flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
+ assertEquals("4", flowFile.getAttribute("maxvalue.id"));
+ assertEquals("2011-01-01 03:23:34.234",
flowFile.getAttribute("maxvalue.created_on"));
flowFile.assertAttributeEquals("record.count", "1");
runner.clearTransferState();
@@ -210,7 +194,7 @@ public class QueryDatabaseTableRecordTest {
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on) VALUES (6, 'Mr. NiFi', 1.0, '2012-01-01 03:23:34.234')");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
- flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals("record.count", "1");
runner.clearTransferState();
@@ -219,7 +203,7 @@ public class QueryDatabaseTableRecordTest {
runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES,
"name");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
- flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals("record.count", "7");
runner.clearTransferState();
@@ -227,7 +211,7 @@ public class QueryDatabaseTableRecordTest {
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on) VALUES (7, 'NULK', 1.0, '2012-01-01 03:23:34.234')");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
- flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals("record.count", "1");
runner.clearTransferState();
@@ -236,7 +220,7 @@ public class QueryDatabaseTableRecordTest {
runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES,
"scale");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
- flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals("record.count", "8");
runner.clearTransferState();
@@ -244,7 +228,7 @@ public class QueryDatabaseTableRecordTest {
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on) VALUES (8, 'NULK', 100.0, '2012-01-01 03:23:34.234')");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
- flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals("record.count", "1");
runner.clearTransferState();
@@ -253,7 +237,7 @@ public class QueryDatabaseTableRecordTest {
runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES,
"bignum");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
- flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals("record.count", "9");
runner.clearTransferState();
@@ -261,24 +245,17 @@ public class QueryDatabaseTableRecordTest {
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on, bignum) VALUES (9, 'Alice Bob', 100.0, '2012-01-01 03:23:34.234',
1)");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
- flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals("record.count", "1");
runner.clearTransferState();
}
@Test
- public void testAddedRowsAutoCommitTrue() throws SQLException, IOException
{
-
+ public void testAddedRowsAutoCommitTrue() throws SQLException {
// load test data to database
- final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
+ final Connection con = service.getConnection();
Statement stmt = con.createStatement();
- try {
- stmt.execute("drop table TEST_QUERY_DB_TABLE");
- } catch (final SQLException ignored) {
- // Ignore this error, probably a "table does not exist" since
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
- }
-
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null,
name varchar(100), scale float, created_on timestamp, bignum bigint default
0)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
@@ -294,29 +271,22 @@ public class QueryDatabaseTableRecordTest {
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 2);
- MockFlowFile flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ MockFlowFile flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
assertEquals("TEST_QUERY_DB_TABLE",
flowFile.getAttribute(QueryDatabaseTableRecord.RESULT_TABLENAME));
- assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+ assertEquals("2", flowFile.getAttribute("maxvalue.id"));
flowFile.assertAttributeEquals("record.count", "2");
flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(1);
- assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+ assertEquals("2", flowFile.getAttribute("maxvalue.id"));
flowFile.assertAttributeEquals("record.count", "1");
}
@Test
- public void testAddedRowsAutoCommitFalse() throws SQLException,
IOException {
-
+ public void testAddedRowsAutoCommitFalse() throws SQLException {
// load test data to database
- final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
+ final Connection con = service.getConnection();
Statement stmt = con.createStatement();
- try {
- stmt.execute("drop table TEST_QUERY_DB_TABLE");
- } catch (final SQLException ignored) {
- // Ignore this error, probably a "table does not exist" since
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
- }
-
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null,
name varchar(100), scale float, created_on timestamp, bignum bigint default
0)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
@@ -332,35 +302,22 @@ public class QueryDatabaseTableRecordTest {
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 2);
- MockFlowFile flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ MockFlowFile flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
assertEquals("TEST_QUERY_DB_TABLE",
flowFile.getAttribute(QueryDatabaseTableRecord.RESULT_TABLENAME));
- assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+ assertEquals("2", flowFile.getAttribute("maxvalue.id"));
flowFile.assertAttributeEquals("record.count", "2");
flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(1);
- assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+ assertEquals("2", flowFile.getAttribute("maxvalue.id"));
flowFile.assertAttributeEquals("record.count", "1");
}
@Test
public void testAddedRowsTwoTables() throws SQLException {
-
// load test data to database
- final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
+ final Connection con = service.getConnection();
Statement stmt = con.createStatement();
- try {
- stmt.execute("drop table TEST_QUERY_DB_TABLE");
- } catch (final SQLException ignored) {
- // Ignore this error, probably a "table does not exist" since
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
- }
-
- try {
- stmt.execute("drop table TEST_QUERY_DB_TABLE2");
- } catch (final SQLException ignored) {
- // Ignore this error, probably a "table does not exist" since
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
- }
-
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null,
name varchar(100), scale float, created_on timestamp, bignum bigint default
0)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
@@ -374,12 +331,12 @@ public class QueryDatabaseTableRecordTest {
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 2);
- MockFlowFile flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ MockFlowFile flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
assertEquals("TEST_QUERY_DB_TABLE",
flowFile.getAttribute(QueryDatabaseTableRecord.RESULT_TABLENAME));
- assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+ assertEquals("2", flowFile.getAttribute("maxvalue.id"));
flowFile.assertAttributeEquals("record.count", "2");
flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(1);
- assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+ assertEquals("2", flowFile.getAttribute("maxvalue.id"));
flowFile.assertAttributeEquals("record.count", "1");
runner.clearTransferState();
@@ -394,9 +351,9 @@ public class QueryDatabaseTableRecordTest {
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
- flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
assertEquals("TEST_QUERY_DB_TABLE2",
flowFile.getAttribute(QueryDatabaseTableRecord.RESULT_TABLENAME));
- assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+ assertEquals("2", flowFile.getAttribute("maxvalue.id"));
flowFile.assertAttributeEquals("record.count", "3");
runner.clearTransferState();
@@ -404,8 +361,8 @@ public class QueryDatabaseTableRecordTest {
stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale,
created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
- flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
- assertEquals(flowFile.getAttribute("maxvalue.id"), "3");
+ flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
+ assertEquals("3", flowFile.getAttribute("maxvalue.id"));
flowFile.assertAttributeEquals("record.count", "1");
// Sanity check - run again, this time no flowfiles/rows should be
transferred
@@ -417,17 +374,10 @@ public class QueryDatabaseTableRecordTest {
@Test
public void testMultiplePartitions() throws SQLException {
-
// load test data to database
- final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
+ final Connection con = service.getConnection();
Statement stmt = con.createStatement();
- try {
- stmt.execute("drop table TEST_QUERY_DB_TABLE");
- } catch (final SQLException ignored) {
- // Ignore this error, probably a "table does not exist" since
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
- }
-
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null,
bucket integer not null)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (0,
0)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (1,
0)");
@@ -439,7 +389,7 @@ public class QueryDatabaseTableRecordTest {
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
assertEquals("2",
-
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0).getAttribute(QueryDatabaseTableRecord.RESULT_ROW_COUNT)
+
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst().getAttribute(QueryDatabaseTableRecord.RESULT_ROW_COUNT)
);
runner.clearTransferState();
@@ -448,7 +398,7 @@ public class QueryDatabaseTableRecordTest {
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
assertEquals("1",
-
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0).getAttribute(QueryDatabaseTableRecord.RESULT_ROW_COUNT)
+
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst().getAttribute(QueryDatabaseTableRecord.RESULT_ROW_COUNT)
);
runner.clearTransferState();
@@ -457,7 +407,7 @@ public class QueryDatabaseTableRecordTest {
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
assertEquals("1",
-
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0).getAttribute(QueryDatabaseTableRecord.RESULT_ROW_COUNT)
+
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst().getAttribute(QueryDatabaseTableRecord.RESULT_ROW_COUNT)
);
runner.clearTransferState();
@@ -471,24 +421,17 @@ public class QueryDatabaseTableRecordTest {
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
assertEquals("1",
-
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0).getAttribute(QueryDatabaseTableRecord.RESULT_ROW_COUNT)
+
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst().getAttribute(QueryDatabaseTableRecord.RESULT_ROW_COUNT)
);
runner.clearTransferState();
}
@Test
public void testTimestampNanos() throws SQLException {
-
// load test data to database
- final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
+ final Connection con = service.getConnection();
Statement stmt = con.createStatement();
- try {
- stmt.execute("drop table TEST_QUERY_DB_TABLE");
- } catch (final SQLException ignored) {
- // Ignore this error, probably a "table does not exist" since
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
- }
-
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null,
name varchar(100), scale float, created_on timestamp, bignum bigint default
0)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.000123456')");
@@ -498,7 +441,7 @@ public class QueryDatabaseTableRecordTest {
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
- MockFlowFile flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ MockFlowFile flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals("record.count", "1");
runner.clearTransferState();
@@ -517,7 +460,7 @@ public class QueryDatabaseTableRecordTest {
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.0003')");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
- flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals("record.count", "1");
runner.clearTransferState();
@@ -530,15 +473,9 @@ public class QueryDatabaseTableRecordTest {
@Test
public void testWithNullIntColumn() throws SQLException {
// load test data to database
- final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
+ final Connection con = service.getConnection();
Statement stmt = con.createStatement();
- try {
- stmt.execute("drop table TEST_NULL_INT");
- } catch (final SQLException ignored) {
- // Ignore, usually due to Derby not having DROP TABLE IF EXISTS
- }
-
stmt.execute("create table TEST_NULL_INT (id integer not null, val1
integer, val2 integer, constraint my_pk primary key (id))");
stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (0,
NULL, 1)");
@@ -549,21 +486,15 @@ public class QueryDatabaseTableRecordTest {
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
-
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0).assertAttributeEquals(QueryDatabaseTableRecord.RESULT_ROW_COUNT,
"2");
+
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst().assertAttributeEquals(QueryDatabaseTableRecord.RESULT_ROW_COUNT,
"2");
}
@Test
public void testWithSqlException() throws SQLException {
// load test data to database
- final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
+ final Connection con = service.getConnection();
Statement stmt = con.createStatement();
- try {
- stmt.execute("drop table TEST_NO_ROWS");
- } catch (final SQLException ignored) {
- // Ignore, usually due to Derby not having DROP TABLE IF EXISTS
- }
-
stmt.execute("create table TEST_NO_ROWS (id integer)");
runner.setIncomingConnection(false);
@@ -579,16 +510,10 @@ public class QueryDatabaseTableRecordTest {
public void testOutputBatchSize() throws SQLException {
// load test data to database
- final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
+ final Connection con = service.getConnection();
Statement stmt = con.createStatement();
MockFlowFile mff;
- try {
- stmt.execute("drop table TEST_QUERY_DB_TABLE");
- } catch (final SQLException ignored) {
- // Ignore this error, probably a "table does not exist" since
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
- }
-
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null,
name varchar(100), scale float, created_on timestamp, bignum bigint default
0)");
int rowCount = 0;
// Create larger row set
@@ -632,16 +557,10 @@ public class QueryDatabaseTableRecordTest {
public void testMaxRowsPerFlowFile() throws IOException, SQLException {
// load test data to database
- final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
+ final Connection con = service.getConnection();
Statement stmt = con.createStatement();
MockFlowFile mff;
- try {
- stmt.execute("drop table TEST_QUERY_DB_TABLE");
- } catch (final SQLException ignored) {
- // Ignore this error, probably a "table does not exist" since
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
- }
-
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null,
name varchar(100), scale float, created_on timestamp, bignum bigint default
0)");
int rowCount = 0;
//create larger row set
@@ -690,7 +609,7 @@ public class QueryDatabaseTableRecordTest {
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
- mff =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ mff =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
mff.assertAttributeExists("fragment.identifier");
assertEquals(Integer.toString(0), mff.getAttribute("fragment.index"));
assertEquals("1", mff.getAttribute("fragment.count"));
@@ -705,7 +624,7 @@ public class QueryDatabaseTableRecordTest {
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 2);
- mff =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ mff =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
mff.assertAttributeEquals("record.count", "9");
mff =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(1);
mff.assertAttributeEquals("record.count", "5");
@@ -732,16 +651,10 @@ public class QueryDatabaseTableRecordTest {
public void testMaxRowsPerFlowFileWithMaxFragments() throws SQLException {
// load test data to database
- final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
+ final Connection con = service.getConnection();
Statement stmt = con.createStatement();
MockFlowFile mff;
- try {
- stmt.execute("drop table TEST_QUERY_DB_TABLE");
- } catch (final SQLException ignored) {
- // Ignore this error, probably a "table does not exist" since
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
- }
-
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null,
name varchar(100), scale float, created_on timestamp, bignum bigint default
0)");
int rowCount = 0;
//create larger row set
@@ -754,8 +667,8 @@ public class QueryDatabaseTableRecordTest {
runner.setIncomingConnection(false);
runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES,
"ID");
runner.setProperty(QueryDatabaseTableRecord.MAX_ROWS_PER_FLOW_FILE,
"9");
- Integer maxFragments = 3;
- runner.setProperty(QueryDatabaseTableRecord.MAX_FRAGMENTS,
maxFragments.toString());
+ int maxFragments = 3;
+ runner.setProperty(QueryDatabaseTableRecord.MAX_FRAGMENTS,
Integer.toString(maxFragments));
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS,
maxFragments);
@@ -766,7 +679,7 @@ public class QueryDatabaseTableRecordTest {
mff.assertAttributeExists("fragment.identifier");
assertEquals(Integer.toString(i),
mff.getAttribute("fragment.index"));
- assertEquals(maxFragments.toString(),
mff.getAttribute("fragment.count"));
+ assertEquals(Integer.toString(maxFragments),
mff.getAttribute("fragment.count"));
}
runner.clearTransferState();
@@ -776,15 +689,9 @@ public class QueryDatabaseTableRecordTest {
public void testInitialMaxValue() throws SQLException {
// load test data to database
- final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
+ final Connection con = service.getConnection();
Statement stmt = con.createStatement();
- try {
- stmt.execute("drop table TEST_QUERY_DB_TABLE");
- } catch (final SQLException ignored) {
- // Ignore this error, probably a "table does not exist" since
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
- }
-
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null,
name varchar(100), scale float, created_on timestamp, bignum bigint default
0)");
LocalDateTime dateTime = LocalDateTime.ofEpochSecond(0, 0,
ZoneOffset.UTC);
@@ -808,7 +715,7 @@ public class QueryDatabaseTableRecordTest {
// Initial run with no previous state. Should get only last 4 records
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
- MockFlowFile flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ MockFlowFile flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals("record.count", "4");
runner.getStateManager().assertStateEquals("test_query_db_table" +
AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01
00:09:00.0", Scope.CLUSTER);
runner.clearTransferState();
@@ -825,15 +732,9 @@ public class QueryDatabaseTableRecordTest {
public void testInitialMaxValueWithEL() throws SQLException {
// load test data to database
- final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
+ final Connection con = service.getConnection();
Statement stmt = con.createStatement();
- try {
- stmt.execute("drop table TEST_QUERY_DB_TABLE");
- } catch (final SQLException ignored) {
- // Ignore this error, probably a "table does not exist" since
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
- }
-
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null,
name varchar(100), scale float, created_on timestamp, bignum bigint default
0)");
LocalDateTime dateTime = LocalDateTime.ofEpochSecond(0, 0,
ZoneOffset.UTC);
@@ -857,7 +758,7 @@ public class QueryDatabaseTableRecordTest {
// Initial run with no previous state. Should get only last 4 records
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
- MockFlowFile flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ MockFlowFile flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals("record.count", "4");
runner.getStateManager().assertStateEquals("test_query_db_table" +
AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01
00:09:00.0", Scope.CLUSTER);
runner.clearTransferState();
@@ -875,7 +776,7 @@ public class QueryDatabaseTableRecordTest {
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
- flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals("record.count", "1");
runner.getStateManager().assertStateEquals("test_query_db_table" +
AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01
00:10:00.0", Scope.CLUSTER);
runner.clearTransferState();
@@ -885,15 +786,9 @@ public class QueryDatabaseTableRecordTest {
public void testInitialLoadStrategyStartAtBeginning() throws SQLException {
// load test data to database
- final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
+ final Connection con = service.getConnection();
Statement stmt = con.createStatement();
- try {
- stmt.execute("drop table TEST_QUERY_DB_TABLE");
- } catch (final SQLException ignored) {
- // Ignore this error, probably a "table does not exist" since
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
- }
-
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null,
name varchar(100), scale float, created_on timestamp, bignum bigint default
0)");
LocalDateTime dateTime = LocalDateTime.ofEpochSecond(0, 0,
ZoneOffset.UTC);
@@ -915,7 +810,7 @@ public class QueryDatabaseTableRecordTest {
// Initial run with no previous state. Should get all 10 records
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
- MockFlowFile flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ MockFlowFile flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals("record.count", "10");
runner.getStateManager().assertStateEquals("test_query_db_table" +
AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01
00:09:00.0", Scope.CLUSTER);
runner.clearTransferState();
@@ -932,7 +827,7 @@ public class QueryDatabaseTableRecordTest {
public void testInitialLoadStrategyStartAtCurrentMaximumValues() throws
SQLException {
// load test data to database
- final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
+ final Connection con = service.getConnection();
Statement stmt = con.createStatement();
try {
@@ -977,15 +872,9 @@ public class QueryDatabaseTableRecordTest {
public void testAddedRowsCustomWhereClause() throws SQLException,
IOException {
// load test data to database
- final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
+ final Connection con = service.getConnection();
Statement stmt = con.createStatement();
- try {
- stmt.execute("drop table TEST_QUERY_DB_TABLE");
- } catch (final SQLException ignored) {
- // Ignore this error, probably a "table does not exist" since
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
- }
-
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null,
type varchar(20), name varchar(100), scale float, created_on timestamp, bignum
bigint default 0)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale,
created_on) VALUES (0, 'male', 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale,
created_on) VALUES (1, 'female', 'Carrie Jones', 5.0, '2000-01-01
03:23:34.234')");
@@ -1000,9 +889,9 @@ public class QueryDatabaseTableRecordTest {
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
- MockFlowFile flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ MockFlowFile flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
assertEquals("TEST_QUERY_DB_TABLE",
flowFile.getAttribute(QueryDatabaseTableRecord.RESULT_TABLENAME));
- assertEquals(flowFile.getAttribute("maxvalue.id"), "0");
+ assertEquals("0", flowFile.getAttribute("maxvalue.id"));
flowFile.assertAttributeEquals("record.count", "1");
runner.clearTransferState();
@@ -1033,9 +922,9 @@ public class QueryDatabaseTableRecordTest {
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale,
created_on) VALUES (4, 'male', 'Marty Johnson', 15.0, '2011-01-01
03:23:34.234')");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
- flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
- assertEquals(flowFile.getAttribute("maxvalue.id"), "4");
- assertEquals(flowFile.getAttribute("maxvalue.created_on"), "2011-01-01
03:23:34.234");
+ flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
+ assertEquals("4", flowFile.getAttribute("maxvalue.id"));
+ assertEquals("2011-01-01 03:23:34.234",
flowFile.getAttribute("maxvalue.created_on"));
flowFile.assertAttributeEquals("record.count", "1");
runner.clearTransferState();
@@ -1049,7 +938,7 @@ public class QueryDatabaseTableRecordTest {
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale,
created_on) VALUES (6, 'male', 'Mr. NiFi', 1.0, '2012-01-01 03:23:34.234')");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
- flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals("record.count", "1");
runner.clearTransferState();
@@ -1058,7 +947,7 @@ public class QueryDatabaseTableRecordTest {
runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES,
"name");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
- flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals("record.count", "4");
runner.clearTransferState();
@@ -1066,7 +955,7 @@ public class QueryDatabaseTableRecordTest {
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale,
created_on) VALUES (7, 'male', 'NULK', 1.0, '2012-01-01 03:23:34.234')");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
- flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals("record.count", "1");
runner.clearTransferState();
@@ -1075,7 +964,7 @@ public class QueryDatabaseTableRecordTest {
runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES,
"scale");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
- flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals("record.count", "5");
runner.clearTransferState();
@@ -1083,7 +972,7 @@ public class QueryDatabaseTableRecordTest {
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale,
created_on) VALUES (8, 'male', 'NULK', 100.0, '2012-01-01 03:23:34.234')");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
- flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals("record.count", "1");
runner.clearTransferState();
@@ -1092,7 +981,7 @@ public class QueryDatabaseTableRecordTest {
runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES,
"bignum");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
- flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals("record.count", "6");
runner.clearTransferState();
@@ -1107,21 +996,9 @@ public class QueryDatabaseTableRecordTest {
public void testCustomSQL() throws SQLException, IOException {
// load test data to database
- final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
+ final Connection con = service.getConnection();
Statement stmt = con.createStatement();
- try {
- stmt.execute("drop table TEST_QUERY_DB_TABLE");
- } catch (final SQLException ignored) {
- // Ignore this error, probably a "table does not exist" since
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
- }
-
- try {
- stmt.execute("drop table TYPE_LIST");
- } catch (final SQLException ignored) {
- // Ignore this error, probably a "table does not exist" since
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
- }
-
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null,
type varchar(20), name varchar(100), scale float, created_on timestamp, bignum
bigint default 0)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale,
created_on) VALUES (0, 'male', 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale,
created_on) VALUES (1, 'female', 'Carrie Jones', 5.0, '2000-01-01
03:23:34.234')");
@@ -1143,9 +1020,9 @@ public class QueryDatabaseTableRecordTest {
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
- MockFlowFile flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ MockFlowFile flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
assertEquals("TEST_QUERY_DB_TABLE",
flowFile.getAttribute(QueryDatabaseTableRecord.RESULT_TABLENAME));
- assertEquals(flowFile.getAttribute("maxvalue.id"), "0");
+ assertEquals("0", flowFile.getAttribute("maxvalue.id"));
flowFile.assertAttributeEquals("record.count", "1");
runner.clearTransferState();
@@ -1176,9 +1053,9 @@ public class QueryDatabaseTableRecordTest {
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale,
created_on) VALUES (4, 'male', 'Marty Johnson', 15.0, '2011-01-01
03:23:34.234')");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
- flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
- assertEquals(flowFile.getAttribute("maxvalue.id"), "4");
- assertEquals(flowFile.getAttribute("maxvalue.created_on"), "2011-01-01
03:23:34.234");
+ flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
+ assertEquals("4", flowFile.getAttribute("maxvalue.id"));
+ assertEquals("2011-01-01 03:23:34.234",
flowFile.getAttribute("maxvalue.created_on"));
flowFile.assertAttributeEquals("record.count", "1");
runner.clearTransferState();
@@ -1192,7 +1069,7 @@ public class QueryDatabaseTableRecordTest {
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale,
created_on) VALUES (6, 'male', 'Mr. NiFi', 1.0, '2012-01-01 03:23:34.234')");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
- flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals("record.count", "1");
runner.clearTransferState();
@@ -1201,7 +1078,7 @@ public class QueryDatabaseTableRecordTest {
runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES,
"name");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
- flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals("record.count", "4");
runner.clearTransferState();
@@ -1209,7 +1086,7 @@ public class QueryDatabaseTableRecordTest {
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale,
created_on) VALUES (7, 'male', 'NULK', 1.0, '2012-01-01 03:23:34.234')");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
- flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals("record.count", "1");
runner.clearTransferState();
@@ -1218,7 +1095,7 @@ public class QueryDatabaseTableRecordTest {
runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES,
"scale");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
- flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals("record.count", "5");
runner.clearTransferState();
@@ -1226,7 +1103,7 @@ public class QueryDatabaseTableRecordTest {
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale,
created_on) VALUES (8, 'male', 'NULK', 100.0, '2012-01-01 03:23:34.234')");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
- flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals("record.count", "1");
runner.clearTransferState();
@@ -1235,7 +1112,7 @@ public class QueryDatabaseTableRecordTest {
runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES,
"bignum");
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
- flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals("record.count", "6");
runner.clearTransferState();
@@ -1249,21 +1126,9 @@ public class QueryDatabaseTableRecordTest {
@Test
public void testMissingColumn() throws ProcessException, SQLException {
// load test data to database
- final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
+ final Connection con = service.getConnection();
Statement stmt = con.createStatement();
- try {
- stmt.execute("drop table TEST_QUERY_DB_TABLE");
- } catch (final SQLException ignored) {
- // Ignore this error, probably a "table does not exist" since
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
- }
-
- try {
- stmt.execute("drop table TYPE_LIST");
- } catch (final SQLException ignored) {
- // Ignore this error, probably a "table does not exist" since
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
- }
-
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null,
type varchar(20), name varchar(100), scale float, created_on timestamp, bignum
bigint default 0)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale,
created_on) VALUES (0, 'male', 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale,
created_on) VALUES (1, 'female', 'Carrie Jones', 5.0, '2000-01-01
03:23:34.234')");
@@ -1281,36 +1146,11 @@ public class QueryDatabaseTableRecordTest {
runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES,
"ID");
runner.setProperty(QueryDatabaseTableRecord.MAX_ROWS_PER_FLOW_FILE,
"2");
- assertThrows(AssertionError.class, () -> {
- runner.run();
- });
- }
-
- /**
- * Simple implementation only for QueryDatabaseTableRecord processor
testing.
- */
- private class DBCPServiceSimpleImpl extends AbstractControllerService
implements DBCPService {
-
- @Override
- public String getIdentifier() {
- return "dbcp";
- }
-
- @Override
- public Connection getConnection() throws ProcessException {
- try {
- Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
- return DriverManager.getConnection("jdbc:derby:" + DB_LOCATION
+ ";create=true");
- } catch (final Exception e) {
- throw new ProcessException("getConnection failed: " + e);
- }
- }
+ assertThrows(AssertionError.class, runner::run);
}
@Stateful(scopes = Scope.CLUSTER, description = "Mock for
QueryDatabaseTableRecord processor")
protected static class MockQueryDatabaseTableRecord extends
QueryDatabaseTableRecord {
- void putColumnType(String colName, Integer colType) {
- columnTypeMap.put(colName, colType);
- }
+
}
}
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListDatabaseTables.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListDatabaseTables.java
index ad912fef33..40bc5c15a6 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListDatabaseTables.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListDatabaseTables.java
@@ -16,102 +16,77 @@
*/
package org.apache.nifi.processors.standard;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.dbcp.DBCPService;
-import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-import org.apache.nifi.util.file.FileUtils;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.condition.DisabledOnOs;
-import org.junit.jupiter.api.condition.OS;
+import org.junit.jupiter.api.io.TempDir;
-import java.io.File;
-import java.io.IOException;
+import java.nio.file.Path;
import java.sql.Connection;
-import java.sql.DriverManager;
import java.sql.SQLException;
-import java.sql.SQLNonTransientConnectionException;
import java.sql.Statement;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
-/**
- * Unit tests for ListDatabaseTables processor.
- */
-public class TestListDatabaseTables {
+class TestListDatabaseTables {
+
+ private static final String SERVICE_ID =
EmbeddedDatabaseConnectionService.class.getSimpleName();
+
+ private static EmbeddedDatabaseConnectionService service;
TestRunner runner;
- ListDatabaseTables processor;
- private final static String DB_LOCATION = "target/db_ldt";
+ ListDatabaseTables processor;
@BeforeAll
- public static void setupBeforeClass() {
- System.setProperty("derby.stream.error.file", "target/derby.log");
-
- // remove previous test database, if any
- final File dbLocation = new File(DB_LOCATION);
- try {
- FileUtils.deleteFile(dbLocation, true);
- } catch (IOException ignored) {
- // Do nothing, may not have existed
- }
+ static void setService(@TempDir final Path databaseLocation) {
+ service = new EmbeddedDatabaseConnectionService(databaseLocation);
}
@AfterAll
- public static void cleanUpAfterClass() throws Exception {
- try {
- DriverManager.getConnection("jdbc:derby:" + DB_LOCATION +
";shutdown=true");
- } catch (SQLNonTransientConnectionException ignored) {
- // Do nothing, this is what happens at Derby shutdown
- }
- // remove previous test database, if any
- final File dbLocation = new File(DB_LOCATION);
- try {
- FileUtils.deleteFile(dbLocation, true);
- } catch (IOException ignored) {
- // Do nothing, may not have existed
- }
- System.clearProperty("derby.stream.error.file");
+ static void shutdown() {
+ service.close();
}
@BeforeEach
- public void setUp() throws Exception {
+ void setRunner() throws Exception {
processor = new ListDatabaseTables();
- final DBCPService dbcp = new DBCPServiceSimpleImpl();
- final Map<String, String> dbcpProperties = new HashMap<>();
runner = TestRunners.newTestRunner(ListDatabaseTables.class);
- runner.addControllerService("dbcp", dbcp, dbcpProperties);
- runner.enableControllerService(dbcp);
- runner.setProperty(ListDatabaseTables.DBCP_SERVICE, "dbcp");
+ runner.addControllerService(SERVICE_ID, service);
+ runner.enableControllerService(service);
+ runner.setProperty(ListDatabaseTables.DBCP_SERVICE, SERVICE_ID);
}
- @Test
- public void testListTablesNoCount() throws Exception {
-
- // load test data to database
- final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
- Statement stmt = con.createStatement();
+ @AfterEach
+ void dropTables() {
+ final List<String> tables = List.of(
+ "TEST_TABLE1",
+ "TEST_TABLE2"
+ );
+
+ for (final String table : tables) {
+ try (
+ Connection connection = service.getConnection();
+ Statement statement = connection.createStatement()
+ ) {
+ statement.execute("DROP TABLE %s".formatted(table));
+ } catch (final SQLException ignored) {
- try {
- stmt.execute("drop table TEST_TABLE1");
- stmt.execute("drop table TEST_TABLE2");
- } catch (final SQLException ignored) {
- // Do nothing, may not have existed
+ }
}
+ }
- stmt.execute("create table TEST_TABLE1 (id integer not null, val1
integer, val2 integer, constraint my_pk1 primary key (id))");
- stmt.execute("create table TEST_TABLE2 (id integer not null, val1
integer, val2 integer, constraint my_pk2 primary key (id))");
+ @Test
+ void testListTablesNoCount() throws SQLException {
+ createTables();
runner.run();
runner.assertTransferCount(ListDatabaseTables.REL_SUCCESS, 2);
@@ -122,24 +97,11 @@ public class TestListDatabaseTables {
}
@Test
- public void testListTablesWithCount() throws Exception {
+ void testListTablesWithCount() throws Exception {
runner.setProperty(ListDatabaseTables.INCLUDE_COUNT, "true");
- // load test data to database
- final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
- Statement stmt = con.createStatement();
-
- try {
- stmt.execute("drop table TEST_TABLE1");
- stmt.execute("drop table TEST_TABLE2");
- } catch (final SQLException ignored) {
- // Do nothing, may not have existed
- }
-
- stmt.execute("create table TEST_TABLE1 (id integer not null, val1
integer, val2 integer, constraint my_pk1 primary key (id))");
- stmt.execute("insert into TEST_TABLE1 (id, val1, val2) VALUES (0,
NULL, 1)");
- stmt.execute("insert into TEST_TABLE1 (id, val1, val2) VALUES (1, 1,
1)");
- stmt.execute("create table TEST_TABLE2 (id integer not null, val1
integer, val2 integer, constraint my_pk2 primary key (id))");
+ createTables();
+ insertFirstTableRows();
runner.run();
runner.assertTransferCount(ListDatabaseTables.REL_SUCCESS, 2);
@@ -149,24 +111,11 @@ public class TestListDatabaseTables {
}
@Test
- public void testListTablesWithCountAsRecord() throws Exception {
+ void testListTablesWithCountAsRecord() throws Exception {
runner.setProperty(ListDatabaseTables.INCLUDE_COUNT, "true");
- // load test data to database
- final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
- Statement stmt = con.createStatement();
-
- try {
- stmt.execute("drop table TEST_TABLE1");
- stmt.execute("drop table TEST_TABLE2");
- } catch (final SQLException ignored) {
- // Do nothing, may not have existed
- }
-
- stmt.execute("create table TEST_TABLE1 (id integer not null, val1
integer, val2 integer, constraint my_pk1 primary key (id))");
- stmt.execute("insert into TEST_TABLE1 (id, val1, val2) VALUES (0,
NULL, 1)");
- stmt.execute("insert into TEST_TABLE1 (id, val1, val2) VALUES (1, 1,
1)");
- stmt.execute("create table TEST_TABLE2 (id integer not null, val1
integer, val2 integer, constraint my_pk2 primary key (id))");
+ createTables();
+ insertFirstTableRows();
final MockRecordWriter recordWriter = new MockRecordWriter(null,
false);
runner.addControllerService("record-writer", recordWriter);
@@ -176,31 +125,18 @@ public class TestListDatabaseTables {
runner.run();
runner.assertTransferCount(ListDatabaseTables.REL_SUCCESS, 1);
- final MockFlowFile flowFile =
runner.getFlowFilesForRelationship(ListDatabaseTables.REL_SUCCESS).get(0);
+ final MockFlowFile flowFile =
runner.getFlowFilesForRelationship(ListDatabaseTables.REL_SUCCESS).getFirst();
flowFile.assertContentEquals(
- "TEST_TABLE1,,APP,APP.TEST_TABLE1,TABLE,,2\n" +
- "TEST_TABLE2,,APP,APP.TEST_TABLE2,TABLE,,0\n");
+ """
+ TEST_TABLE1,,APP,APP.TEST_TABLE1,TABLE,,2
+ TEST_TABLE2,,APP,APP.TEST_TABLE2,TABLE,,0
+ """);
}
- @DisabledOnOs(OS.WINDOWS)
@Test
- public void testListTablesAfterRefresh() throws Exception {
- // load test data to database
- final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
- Statement stmt = con.createStatement();
-
- try {
- stmt.execute("drop table TEST_TABLE1");
- stmt.execute("drop table TEST_TABLE2");
- } catch (final SQLException ignored) {
- // Do nothing, may not have existed
- }
-
- stmt.execute("create table TEST_TABLE1 (id integer not null, val1
integer, val2 integer, constraint my_pk1 primary key (id))");
- stmt.execute("insert into TEST_TABLE1 (id, val1, val2) VALUES (0,
NULL, 1)");
- stmt.execute("insert into TEST_TABLE1 (id, val1, val2) VALUES (1, 1,
1)");
- stmt.execute("create table TEST_TABLE2 (id integer not null, val1
integer, val2 integer, constraint my_pk2 primary key (id))");
- stmt.close();
+ void testListTablesAfterRefresh() throws Exception {
+ createTables();
+ insertFirstTableRows();
runner.setProperty(ListDatabaseTables.INCLUDE_COUNT, "true");
runner.setProperty(ListDatabaseTables.REFRESH_INTERVAL, "100 millis");
@@ -219,23 +155,10 @@ public class TestListDatabaseTables {
runner.assertTransferCount(ListDatabaseTables.REL_SUCCESS, 2);
}
- @DisabledOnOs(OS.WINDOWS)
@Test
- public void testListTablesMultipleRefresh() throws Exception {
- // load test data to database
- final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
- Statement stmt = con.createStatement();
-
- try {
- stmt.execute("drop table TEST_TABLE1");
- stmt.execute("drop table TEST_TABLE2");
- } catch (final SQLException ignored) {
- // Do nothing, may not have existed
- }
-
- stmt.execute("create table TEST_TABLE1 (id integer not null, val1
integer, val2 integer, constraint my_pk1 primary key (id))");
- stmt.execute("insert into TEST_TABLE1 (id, val1, val2) VALUES (0,
NULL, 1)");
- stmt.execute("insert into TEST_TABLE1 (id, val1, val2) VALUES (1, 1,
1)");
+ void testListTablesMultipleRefresh() throws Exception {
+ createFirstTable();
+ insertFirstTableRows();
runner.setProperty(ListDatabaseTables.INCLUDE_COUNT, "true");
runner.setProperty(ListDatabaseTables.REFRESH_INTERVAL, "200 millis");
@@ -243,18 +166,18 @@ public class TestListDatabaseTables {
long startTimer = System.currentTimeMillis();
runner.assertTransferCount(ListDatabaseTables.REL_SUCCESS, 1);
List<MockFlowFile> results =
runner.getFlowFilesForRelationship(ListDatabaseTables.REL_SUCCESS);
- assertEquals("2",
results.get(0).getAttribute(ListDatabaseTables.DB_TABLE_COUNT));
+ assertEquals("2",
results.getFirst().getAttribute(ListDatabaseTables.DB_TABLE_COUNT));
runner.clearTransferState();
// Add another table immediately, the first table should not be listed
again but the second should
- stmt.execute("create table TEST_TABLE2 (id integer not null, val1
integer, val2 integer, constraint my_pk2 primary key (id))");
- stmt.close();
+ createSecondTable();
+
runner.run();
long endTimer = System.currentTimeMillis();
// Expect 1 or 2 tables (whether execution has taken longer than the
refresh time)
runner.assertTransferCount(ListDatabaseTables.REL_SUCCESS, (endTimer -
startTimer > 200) ? 2 : 1);
results =
runner.getFlowFilesForRelationship(ListDatabaseTables.REL_SUCCESS);
- assertEquals((endTimer - startTimer > 200) ? "2" : "0",
results.get(0).getAttribute(ListDatabaseTables.DB_TABLE_COUNT));
+ assertEquals((endTimer - startTimer > 200) ? "2" : "0",
results.getFirst().getAttribute(ListDatabaseTables.DB_TABLE_COUNT));
runner.clearTransferState();
// Now wait longer than the refresh interval and assert the refresh
has happened (i.e. the two tables are re-listed)
@@ -263,24 +186,36 @@ public class TestListDatabaseTables {
runner.assertTransferCount(ListDatabaseTables.REL_SUCCESS, 2);
}
- /**
- * Simple implementation only for ListDatabaseTables processor testing.
- */
- private class DBCPServiceSimpleImpl extends AbstractControllerService
implements DBCPService {
+ private void createTables() throws SQLException {
+ createFirstTable();
+ createSecondTable();
+ }
- @Override
- public String getIdentifier() {
- return "dbcp";
+ private void createFirstTable() throws SQLException {
+ try (
+ Connection connection = service.getConnection();
+ Statement statement = connection.createStatement()
+ ) {
+ statement.execute("create table TEST_TABLE1 (id integer not null,
val1 integer, val2 integer, constraint my_pk1 primary key (id))");
}
+ }
- @Override
- public Connection getConnection() throws ProcessException {
- try {
- Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
- return DriverManager.getConnection("jdbc:derby:" + DB_LOCATION
+ ";create=true");
- } catch (final Exception e) {
- throw new ProcessException("getConnection failed: " + e);
- }
+ private void createSecondTable() throws SQLException {
+ try (
+ Connection connection = service.getConnection();
+ Statement statement = connection.createStatement()
+ ) {
+ statement.execute("create table TEST_TABLE2 (id integer not null,
val1 integer, val2 integer, constraint my_pk2 primary key (id))");
+ }
+ }
+
+ private void insertFirstTableRows() throws SQLException {
+ try (
+ Connection connection = service.getConnection();
+ Statement statement = connection.createStatement()
+ ) {
+ statement.execute("insert into TEST_TABLE1 (id, val1, val2) VALUES
(0, NULL, 1)");
+ statement.execute("insert into TEST_TABLE1 (id, val1, val2) VALUES
(1, 1, 1)");
}
}
}
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateDatabaseTable.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateDatabaseTable.java
index 73f5febe03..0afa70b51a 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateDatabaseTable.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateDatabaseTable.java
@@ -16,9 +16,6 @@
*/
package org.apache.nifi.processors.standard;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.dbcp.DBCPService;
-import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.standard.db.impl.DerbyDatabaseAdapter;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.MockRecordWriter;
@@ -28,83 +25,77 @@ import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
-import java.io.File;
+import java.nio.file.Path;
import java.sql.Connection;
-import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public class TestUpdateDatabaseTable {
+class TestUpdateDatabaseTable {
private static final String createPersons = "CREATE TABLE \"persons\"
(\"id\" integer primary key, \"name\" varchar(100), \"code\" integer)";
private static final String createSchema = "CREATE SCHEMA \"testSchema\"";
-
- @TempDir
- public static File tempDir;
-
- private static String derbyErrorFile;
+ private static final String SERVICE_ID =
EmbeddedDatabaseConnectionService.class.getSimpleName();
private TestRunner runner;
- private UpdateDatabaseTable processor;
- private static DBCPService service;
+ private static EmbeddedDatabaseConnectionService service;
@BeforeAll
- public static void setupClass() throws ProcessException {
- derbyErrorFile = System.getProperty("derby.stream.error.file", "");
- System.setProperty("derby.stream.error.file", "target/derby.log");
- final File dbDir = new File(tempDir, "db");
- service = new MockDBCPService(dbDir.getAbsolutePath());
+ static void setService(@TempDir final Path databaseLocation) {
+ service = new EmbeddedDatabaseConnectionService(databaseLocation);
}
@AfterAll
- public static void restoreDefaults() {
- System.setProperty("derby.stream.error.file", derbyErrorFile);
- final File dbDir = new File(tempDir, "db");
- dbDir.deleteOnExit();
- try {
- DriverManager.getConnection("jdbc:derby:" + dbDir +
";shutdown=true");
- } catch (SQLException ignored) {
- // Ignore, most likely the DB has already been shutdown
- }
+ static void shutdown() {
+ service.close();
}
@BeforeEach
- public void setup() {
- processor = new UpdateDatabaseTable();
+ void setRunner() {
+ runner = TestRunners.newTestRunner(UpdateDatabaseTable.class);
+ }
- try (Statement s = service.getConnection().createStatement()) {
- s.execute("DROP TABLE \"persons\"");
- } catch (SQLException ignored) {
- // Ignore, table probably doesn't exist
- }
+ @AfterEach
+ void dropTables() {
+ final List<String> tables = List.of(
+ "\"persons\"",
+ "\"newTable\""
+ );
+
+ for (final String table : tables) {
+ try (
+ Connection connection = service.getConnection();
+ Statement statement = connection.createStatement()
+ ) {
+ statement.execute("DROP TABLE %s".formatted(table));
+ } catch (final SQLException ignored) {
- try (Statement s = service.getConnection().createStatement()) {
- s.execute("DROP TABLE \"newTable\"");
- } catch (SQLException ignored) {
- // Ignore, table probably doesn't exist
+ }
}
- try (Statement s = service.getConnection().createStatement()) {
- s.execute("DROP SCHEMA \"testSchema\"");
- } catch (SQLException ignored) {
- // Ignore, schema probably doesn't exist
- }
+ try (
+ Connection connection = service.getConnection();
+ Statement statement = connection.createStatement()
+ ) {
+ statement.execute("DROP SCHEMA \"testSchema\"");
+ } catch (final SQLException ignored) {
- runner = TestRunners.newTestRunner(processor);
+ }
}
@Test
@@ -126,9 +117,9 @@ public class TestUpdateDatabaseTable {
runner.setProperty(UpdateDatabaseTable.QUOTE_TABLE_IDENTIFIER,
"false");
runner.setProperty(UpdateDatabaseTable.QUOTE_COLUMN_IDENTIFIERS,
"true");
runner.setProperty(UpdateDatabaseTable.DB_TYPE, new
DerbyDatabaseAdapter().getName());
- runner.addControllerService("dbcp", service);
+ runner.addControllerService(SERVICE_ID, service);
runner.enableControllerService(service);
- runner.setProperty(UpdateDatabaseTable.DBCP_SERVICE, "dbcp");
+ runner.setProperty(UpdateDatabaseTable.DBCP_SERVICE, SERVICE_ID);
Map<String, String> attrs = new HashMap<>();
attrs.put("db.name", "default");
attrs.put("table.name", "newTable");
@@ -192,9 +183,9 @@ public class TestUpdateDatabaseTable {
runner.setProperty(UpdateDatabaseTable.QUOTE_TABLE_IDENTIFIER,
"true");
runner.setProperty(UpdateDatabaseTable.QUOTE_COLUMN_IDENTIFIERS,
"false");
runner.setProperty(UpdateDatabaseTable.DB_TYPE, new
DerbyDatabaseAdapter().getName());
- runner.addControllerService("dbcp", service);
+ runner.addControllerService(SERVICE_ID, service);
runner.enableControllerService(service);
- runner.setProperty(UpdateDatabaseTable.DBCP_SERVICE, "dbcp");
+ runner.setProperty(UpdateDatabaseTable.DBCP_SERVICE, SERVICE_ID);
Map<String, String> attrs = new HashMap<>();
attrs.put("db.name", "default");
attrs.put("table.name", "persons");
@@ -260,9 +251,9 @@ public class TestUpdateDatabaseTable {
runner.setProperty(UpdateDatabaseTable.QUOTE_TABLE_IDENTIFIER,
"true");
runner.setProperty(UpdateDatabaseTable.QUOTE_COLUMN_IDENTIFIERS,
"false");
runner.setProperty(UpdateDatabaseTable.DB_TYPE, new
DerbyDatabaseAdapter().getName());
- runner.addControllerService("dbcp", service);
+ runner.addControllerService(SERVICE_ID, service);
runner.enableControllerService(service);
- runner.setProperty(UpdateDatabaseTable.DBCP_SERVICE, "dbcp");
+ runner.setProperty(UpdateDatabaseTable.DBCP_SERVICE, SERVICE_ID);
Map<String, String> attrs = new HashMap<>();
attrs.put("db.name", "default");
attrs.put("table.name", "persons");
@@ -324,9 +315,9 @@ public class TestUpdateDatabaseTable {
runner.setProperty(UpdateDatabaseTable.QUOTE_TABLE_IDENTIFIER,
"true");
runner.setProperty(UpdateDatabaseTable.QUOTE_COLUMN_IDENTIFIERS,
"false");
runner.setProperty(UpdateDatabaseTable.DB_TYPE, new
DerbyDatabaseAdapter().getName());
- runner.addControllerService("dbcp", service);
+ runner.addControllerService(SERVICE_ID, service);
runner.enableControllerService(service);
- runner.setProperty(UpdateDatabaseTable.DBCP_SERVICE, "dbcp");
+ runner.setProperty(UpdateDatabaseTable.DBCP_SERVICE, SERVICE_ID);
Map<String, String> attrs = new HashMap<>();
attrs.put("db.name", "default");
attrs.put("table.name", "persons");
@@ -404,9 +395,9 @@ public class TestUpdateDatabaseTable {
runner.setProperty(UpdateDatabaseTable.RECORD_WRITER_FACTORY,
"mock-writer-factory");
runner.setProperty(UpdateDatabaseTable.DB_TYPE, new
DerbyDatabaseAdapter().getName());
- runner.addControllerService("dbcp", service);
+ runner.addControllerService(SERVICE_ID, service);
runner.enableControllerService(service);
- runner.setProperty(UpdateDatabaseTable.DBCP_SERVICE, "dbcp");
+ runner.setProperty(UpdateDatabaseTable.DBCP_SERVICE, SERVICE_ID);
Map<String, String> attrs = new HashMap<>();
attrs.put("db.name", "default");
attrs.put("table.name", "persons");
@@ -446,9 +437,9 @@ public class TestUpdateDatabaseTable {
runner.setProperty(UpdateDatabaseTable.QUOTE_TABLE_IDENTIFIER,
"false");
runner.setProperty(UpdateDatabaseTable.QUOTE_COLUMN_IDENTIFIERS,
"true");
runner.setProperty(UpdateDatabaseTable.DB_TYPE, new
DerbyDatabaseAdapter().getName());
- runner.addControllerService("dbcp", service);
+ runner.addControllerService(SERVICE_ID, service);
runner.enableControllerService(service);
- runner.setProperty(UpdateDatabaseTable.DBCP_SERVICE, "dbcp");
+ runner.setProperty(UpdateDatabaseTable.DBCP_SERVICE, SERVICE_ID);
Map<String, String> attrs = new HashMap<>();
attrs.put("db.name", "default");
attrs.put("table.name", "newTable");
@@ -488,32 +479,4 @@ public class TestUpdateDatabaseTable {
assertFalse(rs.next());
}
}
-
-
- /**
- * Simple implementation only for testing purposes
- */
- private static class MockDBCPService extends AbstractControllerService
implements DBCPService {
- private final String dbLocation;
-
- public MockDBCPService(final String dbLocation) {
- this.dbLocation = dbLocation;
- }
-
- @Override
- public String getIdentifier() {
- return "dbcp";
- }
-
- @Override
- public Connection getConnection() throws ProcessException {
- try {
- Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
- return DriverManager.getConnection("jdbc:derby:" + dbLocation
+ ";create=true");
- } catch (final Exception e) {
- throw new ProcessException("getConnection failed: " + e);
- }
- }
- }
-
-}
\ No newline at end of file
+}