exceptionfactory commented on code in PR #10536:
URL: https://github.com/apache/nifi/pull/10536#discussion_r2560346601
##########
nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java:
##########
@@ -1629,6 +1668,42 @@ private Set<String>
normalizeKeyColumnNamesAndCheckForValues(RecordSchema record
return normalizedKeyColumnNames;
}
+ /*
+ * Extract list of queries from config property
+ */
+ private List<String> getQueries(final String value) {
+ if (value == null || value.isEmpty() || value.isBlank()) {
+ return null;
+ }
+ final List<String> queries = new ArrayList<>();
+ for (String query : value.split("(?<!\\\\);")) {
+ query = query.replaceAll("\\\\;", ";");
+ if (!query.isBlank()) {
+ queries.add(query.trim());
+ }
+ }
+ return queries;
+ }
+
+ /*
+ * Executes given queries using pre-defined connection.
+ * Returns null on success, or a query string if failed.
+ */
+ private Pair<String, SQLException> executeConfigStatements(final
Connection con, final List<String> configQueries) {
Review Comment:
Catching and returning the exception, just to throw it from the calling
method, does not seem like the best approach. I recommend simply declaring
`throws SQLException` on this method and not returning anything.
##########
nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java:
##########
@@ -2157,6 +2157,234 @@ public void testInsertLongVarBinaryColumn() throws
InitializationException, Proc
conn.close();
}
+ @Test
+ public void testInsertWithPreQuery() throws InitializationException,
ProcessException, SQLException {
+ setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
+ recreateTable(createPersons);
+ final MockRecordParser parser = new MockRecordParser();
+ runner.addControllerService("parser", parser);
+ runner.enableControllerService(parser);
+
+ parser.addSchemaField("id", RecordFieldType.INT);
+ parser.addSchemaField("name", RecordFieldType.STRING);
+ parser.addSchemaField("code", RecordFieldType.INT);
+ parser.addSchemaField("dt", RecordFieldType.DATE);
+
+ LocalDate testDate1 = LocalDate.of(2021, 1, 26);
+ Date jdbcDate1 = Date.valueOf(testDate1); // in local TZ
+ LocalDate testDate2 = LocalDate.of(2021, 7, 26);
+ Date jdbcDate2 = Date.valueOf(testDate2); // in local TZ
+
+ parser.addRecord(1, "rec1", 101, jdbcDate1);
+ parser.addRecord(2, "rec2", 102, jdbcDate2);
+ parser.addRecord(3, "rec3", 103, null);
+ parser.addRecord(4, "rec4", 104, null);
+ parser.addRecord(5, null, 105, null);
+
+ runner.setProperty(PutDatabaseRecord.SQL_PRE_QUERY, "CALL
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
+
+ runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
+ runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE,
PutDatabaseRecord.INSERT_TYPE);
+ runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");
+
+ runner.enqueue(new byte[0]);
+ runner.run();
+
+ runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1);
+ final Connection conn = dbcp.getConnection();
+ final Statement stmt = conn.createStatement();
+ final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertEquals("rec1", rs.getString(2));
+ assertEquals(101, rs.getInt(3));
+ assertEquals(jdbcDate1.toString(), rs.getDate(4).toString());
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+ assertEquals("rec2", rs.getString(2));
+ assertEquals(102, rs.getInt(3));
+ assertEquals(jdbcDate2.toString(), rs.getDate(4).toString());
+ assertTrue(rs.next());
+ assertEquals(3, rs.getInt(1));
+ assertEquals("rec3", rs.getString(2));
+ assertEquals(103, rs.getInt(3));
+ assertNull(rs.getDate(4));
+ assertTrue(rs.next());
+ assertEquals(4, rs.getInt(1));
+ assertEquals("rec4", rs.getString(2));
+ assertEquals(104, rs.getInt(3));
+ assertNull(rs.getDate(4));
+ assertTrue(rs.next());
+ assertEquals(5, rs.getInt(1));
+ assertNull(rs.getString(2));
+ assertEquals(105, rs.getInt(3));
+ assertNull(rs.getDate(4));
+ assertFalse(rs.next());
Review Comment:
These assertions appear to be duplicated in the other test method. Recommend
creating a shared private `assertResultsFound()` method or similar to delegate
the evaluation.
##########
nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java:
##########
@@ -413,6 +414,28 @@ The name of the database (or the name of the catalog,
depending on the destinati
.required(false)
.build();
+ static final PropertyDescriptor SQL_PRE_QUERY = new
PropertyDescriptor.Builder()
+ .name("SQL Pre-Query")
+ .description("A semicolon-delimited list of queries executed
before the main SQL query is executed. " +
+ "For example, set session properties before main query. " +
+ "It's possible to include semicolons in the statements
themselves by escaping them with a backslash ('\\;'). " +
+ "Results/outputs from these queries will be suppressed if
there are no errors.")
Review Comment:
A multiline string can be used instead of string concatenation to build the
description.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]