This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 08ff54f5fb Add auto commit property to QueryDatabaseTable and
QueryDatabaseTable processors to allow disabling auto commit so PostgreSQL
Fetch Size will work
08ff54f5fb is described below
commit 08ff54f5fb712d01f372fd25a2d904d605e271ba
Author: Jim Steinebrey <[email protected]>
AuthorDate: Tue Mar 19 11:54:38 2024 -0400
Add auto commit property to QueryDatabaseTable and QueryDatabaseTable
processors to allow disabling auto commit so PostgreSQL Fetch Size will work
NIFI-1931 Add proper default value for auto commit (false) to
PostgreSQLDatabaseAdapter to allow FETCH_SIZE to be honored on reads.
NIFI-1931 Added customValidate code to check the auto commit property
setting against the db adapter's required auto commit setting and give
validation error message if they do not match.
NIFI-1931 Added automated test to check the Auto Commit customValidate
error message.
NIFI-1931 remove clearDefaultValue() because it is not needed since
required = false a;ready defaults it to null.
This closes #8534
Signed-off-by: Matt Burgess <[email protected]>
---
.../standard/AbstractQueryDatabaseTable.java | 71 ++++++++++++++-
.../processors/standard/QueryDatabaseTable.java | 1 +
.../standard/QueryDatabaseTableRecord.java | 1 +
.../processors/standard/db/DatabaseAdapter.java | 13 +++
.../db/impl/PostgreSQLDatabaseAdapter.java | 18 ++++
.../processors/standard/QueryDatabaseTableIT.java | 78 ++++++++++++++++
.../standard/QueryDatabaseTableRecordIT.java | 78 ++++++++++++++++
.../standard/QueryDatabaseTableRecordTest.java | 97 ++++++++++++++++++--
.../standard/QueryDatabaseTableTest.java | 101 +++++++++++++++++++--
.../db/impl/TestPostgreSQLDatabaseAdapter.java | 16 ++++
10 files changed, 456 insertions(+), 18 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
index e5fc6745d6..7f7a870fb7 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
@@ -90,16 +90,34 @@ public abstract class AbstractQueryDatabaseTable extends
AbstractDatabaseFetchPr
"TRANSACTION_SERIALIZABLE"
);
+ private static final String FETCH_SIZE_NAME = "Fetch Size";
+ private static final String AUTO_COMMIT_NAME = "Set Auto Commit";
+
public static final PropertyDescriptor FETCH_SIZE = new
PropertyDescriptor.Builder()
- .name("Fetch Size")
+ .name(FETCH_SIZE_NAME)
.description("The number of result rows to be fetched from the
result set at a time. This is a hint to the database driver and may not be "
- + "honored and/or exact. If the value specified is zero,
then the hint is ignored.")
+ + "honored and/or exact. If the value specified is zero,
then the hint is ignored. "
+ + "If using PostgreSQL, then '" + AUTO_COMMIT_NAME + "'
must be equal to 'false' to cause '" + FETCH_SIZE_NAME + "' to take effect.")
.defaultValue("0")
.required(true)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
+ public static final PropertyDescriptor AUTO_COMMIT = new
PropertyDescriptor.Builder()
+ .name(AUTO_COMMIT_NAME)
+ .description("Allows enabling or disabling the auto commit
functionality of the DB connection. Default value is 'No value set'. " +
+ "'No value set' will leave the db connection's auto commit
mode unchanged. " +
+ "For some JDBC drivers such as PostgreSQL driver, it is
required to disable the auto commit functionality " +
+ "to get the '" + FETCH_SIZE_NAME + "' setting to take
effect. " +
+ "When auto commit is enabled, PostgreSQL driver ignores '"
+ FETCH_SIZE_NAME + "' setting and loads all rows of the result set to memory
at once. " +
+ "This could lead for a large amount of memory usage when
executing queries which fetch large data sets. " +
+ "More Details of this behaviour in PostgreSQL driver can
be found in https://jdbc.postgresql.org//documentation/head/query.html.")
+ .allowableValues("true", "false")
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .required(false)
+ .build();
+
public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new
PropertyDescriptor.Builder()
.name("qdbt-max-rows")
.displayName("Max Rows Per Flow File")
@@ -196,6 +214,23 @@ public abstract class AbstractQueryDatabaseTable extends
AbstractDatabaseFetchPr
.build());
}
+ final Boolean propertyAutoCommit =
validationContext.getProperty(AUTO_COMMIT).evaluateAttributeExpressions().asBoolean();
+ final Integer fetchSize =
validationContext.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
+ final DatabaseAdapter dbAdapter =
dbAdapters.get(validationContext.getProperty(DB_TYPE).getValue());
+ final Boolean adapterAutoCommit = dbAdapter == null
+ ? null
+ : dbAdapter.getAutoCommitForReads(fetchSize).orElse(null);
+ if (adapterAutoCommit != null && propertyAutoCommit != null
+ && propertyAutoCommit != adapterAutoCommit ) {
+ results.add(new ValidationResult.Builder().valid(false)
+ .subject(AUTO_COMMIT.getDisplayName())
+ .input(String.valueOf(propertyAutoCommit))
+ .explanation(String.format("'%s' must be set to '%s'
because '%s' %s requires it to be '%s'",
+ AUTO_COMMIT.getDisplayName(), adapterAutoCommit,
+ dbAdapter.getName(), DB_TYPE.getDisplayName(),
adapterAutoCommit))
+ .build());
+ }
+
return results;
}
@@ -304,7 +339,7 @@ public abstract class AbstractQueryDatabaseTable extends
AbstractDatabaseFetchPr
}
}
} catch (final Exception e) {
- logger.error("Unable to execute SQL select query {} due to
{}", new Object[]{selectMaxQuery, e});
+ logger.error("Unable to execute SQL select query {} due to
{}", selectMaxQuery, e);
context.yield();
}
}
@@ -343,6 +378,24 @@ public abstract class AbstractQueryDatabaseTable extends
AbstractDatabaseFetchPr
if (logger.isDebugEnabled()) {
logger.debug("Executing query {}", new Object[] { selectQuery
});
}
+
+ final boolean originalAutoCommit = con.getAutoCommit();
+ final Boolean propertyAutoCommitValue =
context.getProperty(AUTO_COMMIT).evaluateAttributeExpressions().asBoolean();
+ // If user sets AUTO_COMMIT property to non-null (i.e. true or
false), then the property value overrides the dbAdapter's value
+ final Boolean setAutoCommitValue =
+ dbAdapter == null || propertyAutoCommitValue != null
+ ? propertyAutoCommitValue
+ :
dbAdapter.getAutoCommitForReads(fetchSize).orElse(null);
+ if (setAutoCommitValue != null && originalAutoCommit !=
setAutoCommitValue) {
+ try {
+ con.setAutoCommit(setAutoCommitValue);
+ logger.debug("Driver connection changed to
setAutoCommit({})", setAutoCommitValue);
+ } catch (Exception ex) {
+ logger.debug("Failed to setAutoCommit({}) due to {}: {}",
+ setAutoCommitValue, ex.getClass().getName(),
ex.getMessage());
+ }
+ }
+
try (final ResultSet resultSet = st.executeQuery(selectQuery)) {
int fragmentIndex=0;
// Max values will be updated in the state property map by the
callback
@@ -441,12 +494,22 @@ public abstract class AbstractQueryDatabaseTable extends
AbstractDatabaseFetchPr
}
} catch (final SQLException e) {
throw e;
+ } finally {
+ if (con.getAutoCommit() != originalAutoCommit) {
+ try {
+ con.setAutoCommit(originalAutoCommit);
+ logger.debug("Driver connection reset to original
setAutoCommit({})", originalAutoCommit);
+ } catch (Exception ex) {
+ logger.debug("Failed to setAutoCommit({}) due to {}:
{}",
+ originalAutoCommit, ex.getClass().getName(),
ex.getMessage());
+ }
+ }
}
session.transfer(resultSetFlowFiles, REL_SUCCESS);
} catch (final ProcessException | SQLException e) {
- logger.error("Unable to execute SQL select query {} due to {}",
new Object[]{selectQuery, e});
+ logger.error("Unable to execute SQL select query {} due to {}",
selectQuery, e);
if (!resultSetFlowFiles.isEmpty()) {
session.remove(resultSetFlowFiles);
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index 24123729b8..51fbc41409 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -109,6 +109,7 @@ public class QueryDatabaseTable extends
AbstractQueryDatabaseTable {
pds.add(INITIAL_LOAD_STRATEGY);
pds.add(QUERY_TIMEOUT);
pds.add(FETCH_SIZE);
+ pds.add(AUTO_COMMIT);
pds.add(MAX_ROWS_PER_FLOW_FILE);
pds.add(OUTPUT_BATCH_SIZE);
pds.add(MAX_FRAGMENTS);
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
index 5838d7e46c..2004649976 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
@@ -208,6 +208,7 @@ public class QueryDatabaseTableRecord extends
AbstractQueryDatabaseTable {
pds.add(INITIAL_LOAD_STRATEGY);
pds.add(QUERY_TIMEOUT);
pds.add(FETCH_SIZE);
+ pds.add(AUTO_COMMIT);
pds.add(MAX_ROWS_PER_FLOW_FILE);
pds.add(OUTPUT_BATCH_SIZE);
pds.add(MAX_FRAGMENTS);
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
index ab661998ed..65b43ff8b7 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
/**
@@ -211,6 +212,18 @@ public interface DatabaseAdapter {
return Collections.singletonList(createTableStatement.toString());
}
+ /**
+ * Get the auto commit mode to use for reading from this database type.
+ * Most databases do not care which auto commit mode is used to read.
+ * For PostgreSQL it can make a difference.
+ * @param fetchSize The number of rows to retrieve at a time. Value of 0
means retrieve all rows at once.
+ * @return Optional.empty() if auto commit mode does not matter and can be
left as is.
+ * Return true or false to indicate whether auto commit needs to
be true or false for this database.
+ */
+ default Optional<Boolean> getAutoCommitForReads(Integer fetchSize) {
+ return Optional.empty();
+ }
+
default String getSQLForDataType(int sqlType) {
return JDBCType.valueOf(sqlType).getName();
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PostgreSQLDatabaseAdapter.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PostgreSQLDatabaseAdapter.java
index 5e48818600..8ba7b64b22 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PostgreSQLDatabaseAdapter.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PostgreSQLDatabaseAdapter.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.stream.Collectors;
import static java.sql.Types.CHAR;
@@ -160,6 +161,23 @@ public class PostgreSQLDatabaseAdapter extends
GenericDatabaseAdapter {
.toString());
}
+ /**
+ * Get the auto commit mode to use for reading from this database type.
+ * For PostgreSQL databases, auto commit mode must be set to false to
cause a fetchSize other than 0 to take effect.
+ * More Details of this behaviour in PostgreSQL driver can be found in
https://jdbc.postgresql.org//documentation/head/query.html.")
+ * For PostgreSQL, if autocommit is TRUE, then fetch size is treated as 0
which loads all rows of the result set to memory at once.
+ * @param fetchSize The number of rows to retrieve at a time. Value of 0
means retrieve all rows at once.
+ * @return Optional.empty() if auto commit mode does not matter and can be
left as is.
+ * Return true or false to indicate whether auto commit needs to
be true or false for this database.
+ */
+ @Override
+ public Optional<Boolean> getAutoCommitForReads(Integer fetchSize) {
+ if (fetchSize != null && fetchSize != 0) {
+ return Optional.of(Boolean.FALSE);
+ }
+ return Optional.empty();
+ }
+
@Override
public String getSQLForDataType(int sqlType) {
switch (sqlType) {
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableIT.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableIT.java
new file mode 100644
index 0000000000..602a87e945
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableIT.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.dbcp.DBCPConnectionPool;
+import org.apache.nifi.dbcp.utils.DBCPProperties;
+import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+import org.apache.nifi.processors.standard.db.impl.PostgreSQLDatabaseAdapter;
+import org.apache.nifi.reporting.InitializationException;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.PostgreSQLContainer;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class QueryDatabaseTableIT extends QueryDatabaseTableTest {
+ private static PostgreSQLContainer<?> postgres;
+
+ @BeforeAll
+ public static void setupBeforeClass() {
+ postgres = new PostgreSQLContainer<>("postgres:9.6.12")
+ .withInitScript("PutDatabaseRecordIT/create-person-table.sql");
+ postgres.start();
+ }
+
+ @AfterAll
+ public static void cleanUpAfterClass() {
+ if (postgres != null) {
+ postgres.close();
+ postgres = null;
+ }
+ }
+
+ @Override
+ public DatabaseAdapter createDatabaseAdapter() {
+ return new PostgreSQLDatabaseAdapter();
+ }
+
+ @Override
+ public void createDbcpControllerService() throws InitializationException {
+ final DBCPConnectionPool connectionPool = new DBCPConnectionPool();
+ runner.addControllerService("dbcp", connectionPool);
+ runner.setProperty(connectionPool, DBCPProperties.DATABASE_URL,
postgres.getJdbcUrl());
+ runner.setProperty(connectionPool, DBCPProperties.DB_USER,
postgres.getUsername());
+ runner.setProperty(connectionPool, DBCPProperties.DB_PASSWORD,
postgres.getPassword());
+ runner.setProperty(connectionPool, DBCPProperties.DB_DRIVERNAME,
postgres.getDriverClassName());
+ runner.enableControllerService(connectionPool);
+ }
+
+ @Test
+ public void testAddedRowsAutoCommitTrue() throws SQLException, IOException
{
+ // this test in the base class is not valid for PostgreSQL so check
the validation error message.
+ final AssertionError assertionError =
assertThrows(AssertionError.class, super::testAddedRowsAutoCommitTrue);
+ assertThat(assertionError.getMessage(), equalTo("Processor has 1
validation failures:\n" +
+ "'Set Auto Commit' validated against 'true' is invalid because
'Set Auto Commit' " +
+ "must be set to 'false' because 'PostgreSQL' Database Type
requires it to be 'false'\n"));
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordIT.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordIT.java
new file mode 100644
index 0000000000..4a98f0d48d
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordIT.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.dbcp.DBCPConnectionPool;
+import org.apache.nifi.dbcp.utils.DBCPProperties;
+import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+import org.apache.nifi.processors.standard.db.impl.PostgreSQLDatabaseAdapter;
+import org.apache.nifi.reporting.InitializationException;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.PostgreSQLContainer;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class QueryDatabaseTableRecordIT extends QueryDatabaseTableRecordTest {
+ private static PostgreSQLContainer<?> postgres;
+
+ @BeforeAll
+ public static void setupBeforeClass() {
+ postgres = new PostgreSQLContainer<>("postgres:9.6.12")
+ .withInitScript("PutDatabaseRecordIT/create-person-table.sql");
+ postgres.start();
+ }
+
+ @AfterAll
+ public static void cleanUpAfterClass() {
+ if (postgres != null) {
+ postgres.close();
+ postgres = null;
+ }
+ }
+
+ @Override
+ public DatabaseAdapter createDatabaseAdapter() {
+ return new PostgreSQLDatabaseAdapter();
+ }
+
+ @Override
+ public void createDbcpControllerService() throws InitializationException {
+ final DBCPConnectionPool connectionPool = new DBCPConnectionPool();
+ runner.addControllerService("dbcp", connectionPool);
+ runner.setProperty(connectionPool, DBCPProperties.DATABASE_URL,
postgres.getJdbcUrl());
+ runner.setProperty(connectionPool, DBCPProperties.DB_USER,
postgres.getUsername());
+ runner.setProperty(connectionPool, DBCPProperties.DB_PASSWORD,
postgres.getPassword());
+ runner.setProperty(connectionPool, DBCPProperties.DB_DRIVERNAME,
postgres.getDriverClassName());
+ runner.enableControllerService(connectionPool);
+ }
+
+ @Test
+ public void testAddedRowsAutoCommitTrue() throws SQLException, IOException
{
+ // this test in the base class is not valid for PostgreSQL so check
the validation error message.
+ final AssertionError assertionError =
assertThrows(AssertionError.class, super::testAddedRowsAutoCommitTrue);
+ assertThat(assertionError.getMessage(), equalTo("Processor has 1
validation failures:\n" +
+ "'Set Auto Commit' validated against 'true' is invalid because
'Set Auto Commit' " +
+ "must be set to 'false' because 'PostgreSQL' Database Type
requires it to be 'false'\n"));
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java
index 91b288df16..7e7de06992 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java
@@ -71,7 +71,7 @@ public class QueryDatabaseTableRecordTest {
private static final DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
MockQueryDatabaseTableRecord processor;
- private TestRunner runner;
+ protected TestRunner runner;
private final static String DB_LOCATION = "target/db_qdt";
private DatabaseAdapter dbAdapter;
private HashMap<String, DatabaseAdapter> origDbAdapters;
@@ -109,18 +109,25 @@ public class QueryDatabaseTableRecordTest {
System.clearProperty("derby.stream.error.file");
}
+ public DatabaseAdapter createDatabaseAdapter() {
+ return new GenericDatabaseAdapter();
+ }
- @BeforeEach
- public void setup() throws InitializationException, IOException {
+ public void createDbcpControllerService() throws InitializationException {
final DBCPService dbcp = new DBCPServiceSimpleImpl();
final Map<String, String> dbcpProperties = new HashMap<>();
+ runner.addControllerService("dbcp", dbcp, dbcpProperties);
+ runner.enableControllerService(dbcp);
+ }
+
+ @BeforeEach
+ public void setup() throws InitializationException, IOException {
origDbAdapters = new HashMap<>(QueryDatabaseTableRecord.dbAdapters);
- dbAdapter = new GenericDatabaseAdapter();
+ dbAdapter = createDatabaseAdapter();
QueryDatabaseTableRecord.dbAdapters.put(dbAdapter.getName(),
dbAdapter);
processor = new MockQueryDatabaseTableRecord();
runner = TestRunners.newTestRunner(processor);
- runner.addControllerService("dbcp", dbcp, dbcpProperties);
- runner.enableControllerService(dbcp);
+ createDbcpControllerService();
runner.setProperty(QueryDatabaseTableRecord.DBCP_SERVICE, "dbcp");
runner.setProperty(QueryDatabaseTableRecord.DB_TYPE,
dbAdapter.getName());
runner.getStateManager().clear(Scope.CLUSTER);
@@ -371,6 +378,82 @@ public class QueryDatabaseTableRecordTest {
runner.clearTransferState();
}
+ @Test
+ public void testAddedRowsAutoCommitTrue() throws SQLException, IOException
{
+
+ // load test data to database
+ final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null,
name varchar(100), scale float, created_on timestamp, bignum bigint default
0)");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+ runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME,
"TEST_QUERY_DB_TABLE");
+ runner.setIncomingConnection(false);
+ runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES,
"ID");
+ runner.setProperty(QueryDatabaseTableRecord.MAX_ROWS_PER_FLOW_FILE,
"2");
+ runner.setProperty(QueryDatabaseTableRecord.FETCH_SIZE, "2");
+ runner.setProperty(QueryDatabaseTable.AUTO_COMMIT, "true");
+
+ runner.run();
+
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 2);
+
+ MockFlowFile flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ assertEquals("TEST_QUERY_DB_TABLE",
flowFile.getAttribute(QueryDatabaseTableRecord.RESULT_TABLENAME));
+ assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+ flowFile.assertAttributeEquals("record.count", "2");
+
+ flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(1);
+ assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+ flowFile.assertAttributeEquals("record.count", "1");
+ }
+
+ @Test
+ public void testAddedRowsAutoCommitFalse() throws SQLException,
IOException {
+
+ // load test data to database
+ final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null,
name varchar(100), scale float, created_on timestamp, bignum bigint default
0)");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+ runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME,
"TEST_QUERY_DB_TABLE");
+ runner.setIncomingConnection(false);
+ runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES,
"ID");
+ runner.setProperty(QueryDatabaseTableRecord.MAX_ROWS_PER_FLOW_FILE,
"2");
+ runner.setProperty(QueryDatabaseTableRecord.FETCH_SIZE, "2");
+ runner.setProperty(QueryDatabaseTable.AUTO_COMMIT, "false");
+
+ runner.run();
+
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 2);
+
+ MockFlowFile flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ assertEquals("TEST_QUERY_DB_TABLE",
flowFile.getAttribute(QueryDatabaseTableRecord.RESULT_TABLENAME));
+ assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+ flowFile.assertAttributeEquals("record.count", "2");
+
+ flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(1);
+ assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+ flowFile.assertAttributeEquals("record.count", "1");
+ }
+
@Test
public void testAddedRowsTwoTables() throws SQLException {
@@ -1415,7 +1498,7 @@ public class QueryDatabaseTableRecordTest {
}
@Stateful(scopes = Scope.CLUSTER, description = "Mock for
QueryDatabaseTableRecord processor")
- private static class MockQueryDatabaseTableRecord extends
QueryDatabaseTableRecord {
+ protected static class MockQueryDatabaseTableRecord extends
QueryDatabaseTableRecord {
void putColumnType(String colName, Integer colType) {
columnTypeMap.put(colName, colType);
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index 17ce74bebb..8f360eeb50 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -74,7 +74,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class QueryDatabaseTableTest {
MockQueryDatabaseTable processor;
- private TestRunner runner;
+ protected TestRunner runner;
private final static String DB_LOCATION = "target/db_qdt";
private DatabaseAdapter dbAdapter;
private HashMap<String, DatabaseAdapter> origDbAdapters;
@@ -113,18 +113,25 @@ public class QueryDatabaseTableTest {
System.clearProperty("derby.stream.error.file");
}
+ public DatabaseAdapter createDatabaseAdapter() {
+ return new GenericDatabaseAdapter();
+ }
- @BeforeEach
- public void setup() throws InitializationException, IOException {
+ public void createDbcpControllerService() throws InitializationException {
final DBCPService dbcp = new DBCPServiceSimpleImpl();
final Map<String, String> dbcpProperties = new HashMap<>();
+ runner.addControllerService("dbcp", dbcp, dbcpProperties);
+ runner.enableControllerService(dbcp);
+ }
+
+ @BeforeEach
+ public void setup() throws InitializationException, IOException {
origDbAdapters = new HashMap<>(QueryDatabaseTable.dbAdapters);
- dbAdapter = new GenericDatabaseAdapter();
+ dbAdapter = createDatabaseAdapter();
QueryDatabaseTable.dbAdapters.put(dbAdapter.getName(), dbAdapter);
processor = new MockQueryDatabaseTable();
runner = TestRunners.newTestRunner(processor);
- runner.addControllerService("dbcp", dbcp, dbcpProperties);
- runner.enableControllerService(dbcp);
+ createDbcpControllerService();
runner.setProperty(QueryDatabaseTable.DBCP_SERVICE, "dbcp");
runner.setProperty(QueryDatabaseTable.DB_TYPE, dbAdapter.getName());
runner.getStateManager().clear(Scope.CLUSTER);
@@ -373,6 +380,86 @@ public class QueryDatabaseTableTest {
runner.clearTransferState();
}
+ @Test
+ public void testAddedRowsAutoCommitTrue() throws SQLException, IOException
{
+
+ // load test data to database
+ final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null,
name varchar(100), scale float, created_on timestamp, bignum bigint default
0)");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+ runner.setProperty(QueryDatabaseTable.TABLE_NAME,
"TEST_QUERY_DB_TABLE");
+ runner.setIncomingConnection(false);
+ runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID");
+ runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE, "2");
+ runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2");
+ runner.setProperty(QueryDatabaseTable.AUTO_COMMIT, "true");
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS,
2);
+
+ MockFlowFile flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0);
+ assertEquals("TEST_QUERY_DB_TABLE",
flowFile.getAttribute(QueryDatabaseTable.RESULT_TABLENAME));
+ assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+ InputStream in = new ByteArrayInputStream(flowFile.toByteArray());
+ assertEquals(2, getNumberOfRecordsFromStream(in));
+
+ flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(1);
+ assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+ in = new ByteArrayInputStream(flowFile.toByteArray());
+ assertEquals(1, getNumberOfRecordsFromStream(in));
+ }
+
+ @Test
+ public void testAddedRowsAutoCommitFalse() throws SQLException,
IOException {
+
+ // load test data to database
+ final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null,
name varchar(100), scale float, created_on timestamp, bignum bigint default
0)");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+ runner.setProperty(QueryDatabaseTable.TABLE_NAME,
"TEST_QUERY_DB_TABLE");
+ runner.setIncomingConnection(false);
+ runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID");
+ runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE, "2");
+ runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2");
+ runner.setProperty(QueryDatabaseTable.AUTO_COMMIT, "false");
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS,
2);
+
+ MockFlowFile flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0);
+ assertEquals("TEST_QUERY_DB_TABLE",
flowFile.getAttribute(QueryDatabaseTable.RESULT_TABLENAME));
+ assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+ InputStream in = new ByteArrayInputStream(flowFile.toByteArray());
+ assertEquals(2, getNumberOfRecordsFromStream(in));
+
+ flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(1);
+ assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+ in = new ByteArrayInputStream(flowFile.toByteArray());
+ assertEquals(1, getNumberOfRecordsFromStream(in));
+ }
+
@Test
public void testAddedRowsTwoTables() throws ClassNotFoundException,
SQLException, InitializationException, IOException {
@@ -1461,7 +1548,7 @@ public class QueryDatabaseTableTest {
}
@Stateful(scopes = Scope.CLUSTER, description = "Mock for
QueryDatabaseTable processor")
- private static class MockQueryDatabaseTable extends QueryDatabaseTable {
+ protected static class MockQueryDatabaseTable extends QueryDatabaseTable {
void putColumnType(String colName, Integer colType) {
columnTypeMap.put(colName, colType);
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestPostgreSQLDatabaseAdapter.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestPostgreSQLDatabaseAdapter.java
index ea10621867..6d866ef90f 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestPostgreSQLDatabaseAdapter.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestPostgreSQLDatabaseAdapter.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -41,6 +42,21 @@ public class TestPostgreSQLDatabaseAdapter {
assertTrue(testSubject.supportsUpsert(),
testSubject.getClass().getSimpleName() + " should support upsert");
}
+ @Test
+ public void getAutoCommitForReadsFetchSizeNull() {
+ assertEquals(Optional.empty(),
testSubject.getAutoCommitForReads(null));
+ }
+
+ @Test
+ public void getAutoCommitForReadsFetchSizeZero() {
+ assertEquals(Optional.empty(), testSubject.getAutoCommitForReads(0));
+ }
+
+ @Test
+ public void getAutoCommitForReadsFetchSizeNonZero() {
+ assertEquals(Optional.of(Boolean.FALSE),
testSubject.getAutoCommitForReads(1));
+ }
+
@Test
public void testGetUpsertStatementWithNullTableName() {
testGetUpsertStatement(null, Arrays.asList("notEmpty"),
Arrays.asList("notEmpty"), new IllegalArgumentException("Table name cannot be
null or blank"));