This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new d4344a3140 NIFI-13642 Added Delete Keys Property to PutDatabaseRecord
d4344a3140 is described below
commit d4344a3140722cb69202f8b58f190127471d5d30
Author: Rajmund Takacs <[email protected]>
AuthorDate: Tue Aug 6 16:48:12 2024 +0200
NIFI-13642 Added Delete Keys Property to PutDatabaseRecord
- Delete Keys property enables targeted deletes for databases that do not
support primary keys
This closes #9162
Signed-off-by: David Handermann <[email protected]>
---
.../processors/standard/PutDatabaseRecord.java | 102 +++++++++++++--------
.../processors/standard/PutDatabaseRecordTest.java | 8 +-
2 files changed, 70 insertions(+), 40 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 541eca9613..a3cfd0c375 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -16,8 +16,43 @@
*/
package org.apache.nifi.processors.standard;
+import static java.lang.String.format;
+import static org.apache.nifi.expression.ExpressionLanguageScope.ENVIRONMENT;
+import static
org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.sql.BatchUpdateException;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.SQLDataException;
+import java.sql.SQLException;
+import java.sql.SQLIntegrityConstraintViolationException;
+import java.sql.SQLTransientException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.HexFormat;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -60,40 +95,6 @@ import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import
org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.sql.BatchUpdateException;
-import java.sql.Clob;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.PreparedStatement;
-import java.sql.SQLDataException;
-import java.sql.SQLException;
-import java.sql.SQLIntegrityConstraintViolationException;
-import java.sql.SQLTransientException;
-import java.sql.Statement;
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.Base64;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.HexFormat;
-import java.util.List;
-import java.util.Map;
-import java.util.ServiceLoader;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Function;
-
-import static java.lang.String.format;
-import static org.apache.nifi.expression.ExpressionLanguageScope.ENVIRONMENT;
-import static
org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
-import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
-
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"sql", "record", "jdbc", "put", "database", "update", "insert",
"delete"})
@CapabilityDescription("The PutDatabaseRecord processor uses a specified
RecordReader to input (possibly multiple) records from an incoming flow file.
These records are translated to SQL "
@@ -306,6 +307,17 @@ public class PutDatabaseRecord extends AbstractProcessor {
.dependsOn(STATEMENT_TYPE, UPDATE_TYPE, UPSERT_TYPE, SQL_TYPE,
USE_ATTR_TYPE, USE_RECORD_PATH)
.build();
+ static final PropertyDescriptor DELETE_KEYS = new Builder()
+ .name("Delete Keys")
+ .description("A comma-separated list of column names that uniquely
identifies a row in the database for DELETE statements. "
+ + "If the Statement Type is DELETE and this property is
not set, the table's columns are used. "
+ + "This property is ignored if the Statement Type is not
DELETE")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(false)
+ .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+ .dependsOn(STATEMENT_TYPE, DELETE_TYPE, SQL_TYPE, USE_ATTR_TYPE,
USE_RECORD_PATH)
+ .build();
+
static final PropertyDescriptor FIELD_CONTAINING_SQL = new Builder()
.name("put-db-record-field-containing-sql")
.displayName("Field Containing SQL")
@@ -427,6 +439,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
UNMATCHED_FIELD_BEHAVIOR,
UNMATCHED_COLUMN_BEHAVIOR,
UPDATE_KEYS,
+ DELETE_KEYS,
FIELD_CONTAINING_SQL,
ALLOW_MULTIPLE_STATEMENTS,
QUOTE_IDENTIFIERS,
@@ -730,6 +743,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
final String schemaName =
context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String tableName =
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String updateKeys =
context.getProperty(UPDATE_KEYS).evaluateAttributeExpressions(flowFile).getValue();
+ final String deleteKeys =
context.getProperty(DELETE_KEYS).evaluateAttributeExpressions(flowFile).getValue();
final int maxBatchSize =
context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions(flowFile).asInteger();
final int timeoutMillis =
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
@@ -795,7 +809,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
} else if
(UPDATE_TYPE.equalsIgnoreCase(statementType)) {
sqlHolder = generateUpdate(recordSchema,
fqTableName, updateKeys, tableSchema, settings);
} else if
(DELETE_TYPE.equalsIgnoreCase(statementType)) {
- sqlHolder = generateDelete(recordSchema,
fqTableName, tableSchema, settings);
+ sqlHolder = generateDelete(recordSchema,
fqTableName, deleteKeys, tableSchema, settings);
} else if
(UPSERT_TYPE.equalsIgnoreCase(statementType)) {
sqlHolder = generateUpsert(recordSchema,
fqTableName, updateKeys, tableSchema, settings);
} else if
(INSERT_IGNORE_TYPE.equalsIgnoreCase(statementType)) {
@@ -1443,7 +1457,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
return new SqlAndIncludedColumns(sqlBuilder.toString(),
includedColumns);
}
- SqlAndIncludedColumns generateDelete(final RecordSchema recordSchema,
final String tableName, final TableSchema tableSchema, final DMLSettings
settings)
+ SqlAndIncludedColumns generateDelete(final RecordSchema recordSchema,
final String tableName, String deleteKeys, final TableSchema tableSchema, final
DMLSettings settings)
throws IllegalArgumentException, MalformedRecordException,
SQLDataException {
final Set<String> normalizedFieldNames =
getNormalizedColumnNames(recordSchema, settings.translateFieldNames);
@@ -1465,14 +1479,28 @@ public class PutDatabaseRecord extends
AbstractProcessor {
sqlBuilder.append(tableName);
// iterate over all of the fields in the record, building the SQL
statement by adding the column names
- List<String> fieldNames = recordSchema.getFieldNames();
+ final List<String> fieldNames = recordSchema.getFieldNames();
final List<Integer> includedColumns = new ArrayList<>();
if (fieldNames != null) {
sqlBuilder.append(" WHERE ");
int fieldCount = fieldNames.size();
AtomicInteger fieldsFound = new AtomicInteger(0);
+ // If 'deleteKeys' is not specified by the user, then all columns
of the table
+ // should be used in the 'WHERE' clause, in order to keep the
original behavior.
+ final Set<String> deleteKeysSet;
+ if (deleteKeys == null) {
+ deleteKeysSet = new HashSet<>(fieldNames);
+ } else {
+ deleteKeysSet = Arrays.stream(deleteKeys.split(","))
+ .map(String::trim)
+ .collect(Collectors.toSet());
+ }
+
for (int i = 0; i < fieldCount; i++) {
+ if (!deleteKeysSet.contains(fieldNames.get(i))) {
+ continue; // skip this field if it should not be included
in 'WHERE'
+ }
RecordField field = recordSchema.getField(i);
String fieldName = field.getFieldName();
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
index b03197a731..307c230ced 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
@@ -405,7 +405,9 @@ public class PutDatabaseRecordTest {
assertEquals("UPDATE PERSONS SET name = ?, code = ? WHERE id = ?",
processor.generateUpdate(schema, "PERSONS", null, tableSchema,
settings).getSql());
assertEquals("DELETE FROM PERSONS WHERE (id = ?) AND (name = ? OR
(name is null AND ? is null)) AND (code = ? OR (code is null AND ? is null))",
- processor.generateDelete(schema, "PERSONS", tableSchema,
settings).getSql());
+ processor.generateDelete(schema, "PERSONS", null, tableSchema,
settings).getSql());
+ assertEquals("DELETE FROM PERSONS WHERE (id = ?) AND (code = ? OR
(code is null AND ? is null))",
+ processor.generateDelete(schema, "PERSONS", "id, code",
tableSchema, settings).getSql());
}
@Test
@@ -450,7 +452,7 @@ public class PutDatabaseRecordTest {
assertEquals("Cannot map field 'non_existing' to any column in the
database\nColumns: id,name,code", e.getMessage());
e = assertThrows(SQLDataException.class,
- () -> processor.generateDelete(schema, "PERSONS", tableSchema,
settings),
+ () -> processor.generateDelete(schema, "PERSONS", null,
tableSchema, settings),
"generateDelete should fail with unmatched fields");
assertEquals("Cannot map field 'non_existing' to any column in the
database\nColumns: id,name,code", e.getMessage());
}
@@ -2495,4 +2497,4 @@ public class PutDatabaseRecordTest {
"; batchSize=" + String.valueOf(batchSize);
}
}
-}
\ No newline at end of file
+}