This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 502572b2f5 NIFI-11858 Configurable Column Name Normalization in 
PutDatabaseRecord and UpdateDatabaseTable
502572b2f5 is described below

commit 502572b2f5911685a5baf8ce70a4c8f5f90b668b
Author: ravisingh <[email protected]>
AuthorDate: Fri Oct 11 17:38:00 2024 -0700

    NIFI-11858 Configurable Column Name Normalization in PutDatabaseRecord and 
UpdateDatabaseTable
    
    cleaned and required changes  for https://github.com/apache/nifi/pull/8995
    
    updated the description to reflect uppercase conversion of column name  
uppercased to do case-insensitive matching irrespective of strategy
    
    added example for REMOVE_ALL_SPECIAL_CHAR  and PATTERN
    
    Signed-off-by: Matt Burgess <[email protected]>
    
    This closes #9382
---
 .../processors/standard/PutDatabaseRecord.java     | 207 ++++++++++++---------
 .../processors/standard/UpdateDatabaseTable.java   |  50 ++++-
 .../processors/standard/db/NameNormalizer.java     |  24 +++
 .../standard/db/NameNormalizerFactory.java         |  38 ++++
 .../nifi/processors/standard/db/TableSchema.java   |  27 ++-
 .../standard/db/TranslationStrategy.java           |  60 ++++++
 .../standard/db/impl/PatternNormalizer.java        |  41 ++++
 .../db/impl/RemoveAllSpecialCharNormalizer.java    |  29 +++
 .../standard/db/impl/RemoveSpaceNormalizer.java    |  27 +++
 .../impl/RemoveUnderscoreAndSpaceNormalizer.java   |  28 +++
 .../db/impl/RemoveUnderscoreNormalizer.java        |  28 +++
 .../processors/standard/PutDatabaseRecordTest.java |  44 ++---
 .../db/impl/TestOracle12DatabaseAdapter.java       |  14 +-
 .../db/impl/TestOracleDatabaseAdapter.java         |   7 +-
 14 files changed, 500 insertions(+), 124 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index a3cfd0c375..19192e57df 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -23,6 +23,7 @@ import static 
org.apache.nifi.expression.ExpressionLanguageScope.NONE;
 
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
+
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -47,12 +48,15 @@ import java.util.HashSet;
 import java.util.HexFormat;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -79,7 +83,10 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
 import org.apache.nifi.processors.standard.db.ColumnDescription;
 import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+import org.apache.nifi.processors.standard.db.NameNormalizer;
+import org.apache.nifi.processors.standard.db.NameNormalizerFactory;
 import org.apache.nifi.processors.standard.db.TableSchema;
+import org.apache.nifi.processors.standard.db.TranslationStrategy;
 import org.apache.nifi.record.path.FieldValue;
 import org.apache.nifi.record.path.RecordPath;
 import org.apache.nifi.record.path.RecordPathResult;
@@ -184,25 +191,26 @@ public class PutDatabaseRecord extends AbstractProcessor {
             .build();
 
     static final PropertyDescriptor STATEMENT_TYPE_RECORD_PATH = new Builder()
-        .name("Statement Type Record Path")
-        .displayName("Statement Type Record Path")
-        .description("Specifies a RecordPath to evaluate against each Record 
in order to determine the Statement Type. The RecordPath should equate to 
either INSERT, UPDATE, UPSERT, or DELETE. "
-                + "(Debezium style operation types are also supported: \"r\" 
and \"c\" for INSERT, \"u\" for UPDATE, and \"d\" for DELETE)")
-        .required(true)
-        .addValidator(new RecordPathValidator())
-        .expressionLanguageSupported(NONE)
-        .dependsOn(STATEMENT_TYPE, USE_RECORD_PATH)
-        .build();
+            .name("Statement Type Record Path")
+            .displayName("Statement Type Record Path")
+            .description("Specifies a RecordPath to evaluate against each 
Record in order to determine the Statement Type. The RecordPath should equate 
to either INSERT, UPDATE, UPSERT, or DELETE. "
+                    + "(Debezium style operation types are also supported: 
\"r\" and \"c\" for INSERT, \"u\" for UPDATE, and \"d\" for DELETE)")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            .expressionLanguageSupported(NONE)
+            .dependsOn(STATEMENT_TYPE, USE_RECORD_PATH)
+            .build();
 
     static final PropertyDescriptor DATA_RECORD_PATH = new Builder()
-        .name("Data Record Path")
-        .displayName("Data Record Path")
-        .description("If specified, this property denotes a RecordPath that 
will be evaluated against each incoming Record and the Record that results from 
evaluating the RecordPath will be sent to" +
-            " the database instead of sending the entire incoming Record. If 
not specified, the entire incoming Record will be published to the database.")
-        .required(false)
-        .addValidator(new RecordPathValidator())
-        .expressionLanguageSupported(NONE)
-        .build();
+            .name("Data Record Path")
+            .displayName("Data Record Path")
+            .description("If specified, this property denotes a RecordPath 
that will be evaluated against each incoming" +
+                    " Record and the Record that results from evaluating the 
RecordPath will be sent to" +
+                    " the database instead of sending the entire incoming 
Record. If not specified, the entire incoming Record will be published to the 
database.")
+            .required(false)
+            .addValidator(new RecordPathValidator())
+            .expressionLanguageSupported(NONE)
+            .build();
 
     static final PropertyDescriptor DBCP_SERVICE = new Builder()
             .name("put-db-record-dcbp-service")
@@ -277,6 +285,25 @@ public class PutDatabaseRecord extends AbstractProcessor {
             .allowableValues("true", "false")
             .defaultValue("true")
             .build();
+    public static final PropertyDescriptor TRANSLATION_STRATEGY = new 
PropertyDescriptor.Builder()
+            .required(true)
+            .name("Column Name Translation Strategy")
+            .description("The strategy used to normalize table column name. 
Column Name will be uppercased to " +
+                    "do case-insensitive matching irrespective of strategy")
+            .allowableValues(TranslationStrategy.class)
+            .defaultValue(TranslationStrategy.REMOVE_UNDERSCORE.getValue())
+            .dependsOn(TRANSLATE_FIELD_NAMES, 
TRANSLATE_FIELD_NAMES.getDefaultValue())
+            .build();
+
+    public static final PropertyDescriptor TRANSLATION_PATTERN = new 
PropertyDescriptor.Builder()
+            .required(true)
+            .name("Column Name Translation Pattern")
+            .description("Column name will be normalized with this regular 
expression")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+            .dependsOn(TRANSLATE_FIELD_NAMES, 
TRANSLATE_FIELD_NAMES.getDefaultValue())
+            .dependsOn(TRANSLATION_STRATEGY, 
TranslationStrategy.PATTERN.getValue())
+            .build();
 
     static final PropertyDescriptor UNMATCHED_FIELD_BEHAVIOR = new Builder()
             .name("put-db-record-unmatched-field-behavior")
@@ -415,14 +442,14 @@ public class PutDatabaseRecord extends AbstractProcessor {
         });
 
         DB_TYPE = new Builder()
-            .name("db-type")
-            .displayName("Database Type")
-            .description("The type/flavor of database, used for generating 
database-specific code. In many cases the Generic type "
-                + "should suffice, but some databases (such as Oracle) require 
custom SQL clauses. ")
-            .allowableValues(dbAdapterValues.toArray(new AllowableValue[0]))
-            .defaultValue("Generic")
-            .required(false)
-            .build();
+                .name("db-type")
+                .displayName("Database Type")
+                .description("The type/flavor of database, used for generating 
database-specific code. In many cases the Generic type "
+                        + "should suffice, but some databases (such as Oracle) 
require custom SQL clauses. ")
+                .allowableValues(dbAdapterValues.toArray(new 
AllowableValue[0]))
+                .defaultValue("Generic")
+                .required(false)
+                .build();
 
         properties = List.of(
                 RECORD_READER_FACTORY,
@@ -436,6 +463,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
                 TABLE_NAME,
                 BINARY_STRING_FORMAT,
                 TRANSLATE_FIELD_NAMES,
+                TRANSLATION_STRATEGY,
+                TRANSLATION_PATTERN,
                 UNMATCHED_FIELD_BEHAVIOR,
                 UNMATCHED_COLUMN_BEHAVIOR,
                 UPDATE_KEYS,
@@ -473,12 +502,12 @@ public class PutDatabaseRecord extends AbstractProcessor {
         DatabaseAdapter databaseAdapter = 
dbAdapters.get(validationContext.getProperty(DB_TYPE).getValue());
         String statementType = 
validationContext.getProperty(STATEMENT_TYPE).getValue();
         if ((UPSERT_TYPE.equals(statementType) && 
!databaseAdapter.supportsUpsert())
-            || (INSERT_IGNORE_TYPE.equals(statementType) && 
!databaseAdapter.supportsInsertIgnore())) {
+                || (INSERT_IGNORE_TYPE.equals(statementType) && 
!databaseAdapter.supportsInsertIgnore())) {
             validationResults.add(new ValidationResult.Builder()
-                .subject(STATEMENT_TYPE.getDisplayName())
-                .valid(false)
-                .explanation(databaseAdapter.getName() + " does not support " 
+ statementType)
-                .build()
+                    .subject(STATEMENT_TYPE.getDisplayName())
+                    .valid(false)
+                    .explanation(databaseAdapter.getName() + " does not 
support " + statementType)
+                    .build()
             );
         }
 
@@ -495,15 +524,15 @@ public class PutDatabaseRecord extends AbstractProcessor {
         }
 
         if (autoCommit != null && autoCommit && 
!isMaxBatchSizeHardcodedToZero(validationContext)) {
-                final String explanation = format("'%s' must be hard-coded to 
zero when '%s' is set to 'true'."
-                                + " Batch size equal to zero executes all 
statements in a single transaction"
-                                + " which allows rollback to revert all the 
flow file's statements together if an error occurs.",
-                        MAX_BATCH_SIZE.getDisplayName(), 
AUTO_COMMIT.getDisplayName());
-
-                validationResults.add(new ValidationResult.Builder()
-                        .subject(MAX_BATCH_SIZE.getDisplayName())
-                        .explanation(explanation)
-                        .build());
+            final String explanation = format("'%s' must be hard-coded to zero 
when '%s' is set to 'true'."
+                            + " Batch size equal to zero executes all 
statements in a single transaction"
+                            + " which allows rollback to revert all the flow 
file's statements together if an error occurs.",
+                    MAX_BATCH_SIZE.getDisplayName(), 
AUTO_COMMIT.getDisplayName());
+
+            validationResults.add(new ValidationResult.Builder()
+                    .subject(MAX_BATCH_SIZE.getDisplayName())
+                    .explanation(explanation)
+                    .build());
         }
 
         return validationResults;
@@ -692,7 +721,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                     final String regex = "(?<!\\\\);";
                     sqlStatements = (sql).split(regex);
                 } else {
-                    sqlStatements = new String[] {sql};
+                    sqlStatements = new String[]{sql};
                 }
 
                 if (isFirstRecord) {
@@ -735,7 +764,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
 
     private void executeDML(final ProcessContext context, final ProcessSession 
session, final FlowFile flowFile,
                             final Connection con, final RecordReader 
recordReader, final String explicitStatementType, final DMLSettings settings)
-        throws IllegalArgumentException, MalformedRecordException, 
IOException, SQLException {
+            throws IllegalArgumentException, MalformedRecordException, 
IOException, SQLException {
 
         final ComponentLog log = getLogger();
 
@@ -753,13 +782,16 @@ public class PutDatabaseRecord extends AbstractProcessor {
         if (StringUtils.isEmpty(tableName)) {
             throw new IllegalArgumentException(format("Cannot process %s 
because Table Name is null or empty", flowFile));
         }
-
+        final NameNormalizer normalizer = Optional.of(settings)
+                .filter(s -> s.translateFieldNames)
+                .map(s -> 
NameNormalizerFactory.getNormalizer(s.translationStrategy, 
s.translationPattern))
+                .orElse(null);
         final SchemaKey schemaKey = new PutDatabaseRecord.SchemaKey(catalog, 
schemaName, tableName);
         final TableSchema tableSchema;
         try {
             tableSchema = schemaCache.get(schemaKey, key -> {
                 try {
-                    final TableSchema schema = TableSchema.from(con, catalog, 
schemaName, tableName, settings.translateFieldNames, updateKeys, log);
+                    final TableSchema schema = TableSchema.from(con, catalog, 
schemaName, tableName, settings.translateFieldNames, normalizer, updateKeys, 
log);
                     getLogger().debug("Fetched Table Schema {} for table name 
{}", schema, tableName);
                     return schema;
                 } catch (SQLException e) {
@@ -780,7 +812,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
         }
 
         // build the fully qualified table name
-        final String fqTableName =  generateTableName(settings, catalog, 
schemaName, tableName, tableSchema);
+        final String fqTableName = generateTableName(settings, catalog, 
schemaName, tableName, tableSchema);
 
         final Map<String, PreparedSqlAndColumns> preparedSql = new HashMap<>();
         int currentBatchSize = 0;
@@ -805,15 +837,15 @@ public class PutDatabaseRecord extends AbstractProcessor {
 
                         final SqlAndIncludedColumns sqlHolder;
                         if (INSERT_TYPE.equalsIgnoreCase(statementType)) {
-                            sqlHolder = generateInsert(recordSchema, 
fqTableName, tableSchema, settings);
+                            sqlHolder = generateInsert(recordSchema, 
fqTableName, tableSchema, settings, normalizer);
                         } else if 
(UPDATE_TYPE.equalsIgnoreCase(statementType)) {
-                            sqlHolder = generateUpdate(recordSchema, 
fqTableName, updateKeys, tableSchema, settings);
+                            sqlHolder = generateUpdate(recordSchema, 
fqTableName, updateKeys, tableSchema, settings, normalizer);
                         } else if 
(DELETE_TYPE.equalsIgnoreCase(statementType)) {
-                            sqlHolder = generateDelete(recordSchema, 
fqTableName, deleteKeys, tableSchema, settings);
+                            sqlHolder = generateDelete(recordSchema, 
fqTableName, deleteKeys, tableSchema, settings, normalizer);
                         } else if 
(UPSERT_TYPE.equalsIgnoreCase(statementType)) {
-                            sqlHolder = generateUpsert(recordSchema, 
fqTableName, updateKeys, tableSchema, settings);
+                            sqlHolder = generateUpsert(recordSchema, 
fqTableName, updateKeys, tableSchema, settings, normalizer);
                         } else if 
(INSERT_IGNORE_TYPE.equalsIgnoreCase(statementType)) {
-                            sqlHolder = generateInsertIgnore(recordSchema, 
fqTableName, updateKeys, tableSchema, settings);
+                            sqlHolder = generateInsertIgnore(recordSchema, 
fqTableName, updateKeys, tableSchema, settings, normalizer);
                         } else {
                             throw new 
IllegalArgumentException(format("Statement Type %s is not valid, FlowFile %s", 
statementType, flowFile));
                         }
@@ -843,7 +875,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                     if (ps != lastPreparedStatement && lastPreparedStatement 
!= null) {
                         batchIndex++;
                         log.debug("Executing query {} because Statement Type 
changed between Records for {}; fieldIndexes: {}; batch index: {}; batch size: 
{}",
-                            sql, flowFile, fieldIndexes, batchIndex, 
currentBatchSize);
+                                sql, flowFile, fieldIndexes, batchIndex, 
currentBatchSize);
                         lastPreparedStatement.executeBatch();
 
                         session.adjustCounter("Batches Executed", 1, false);
@@ -863,7 +895,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                         final DataType dataType = 
dataTypes.get(currentFieldIndex);
                         final int fieldSqlType = 
DataTypeUtils.getSQLTypeValue(dataType);
                         final String fieldName = 
recordSchema.getField(currentFieldIndex).getFieldName();
-                        String columnName = 
ColumnDescription.normalizeColumnName(fieldName, settings.translateFieldNames);
+                        String columnName = 
TableSchema.normalizedName(fieldName, settings.translateFieldNames, normalizer);
                         int sqlType;
 
                         final ColumnDescription column = 
columns.get(columnName);
@@ -952,7 +984,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                     if (++currentBatchSize == maxBatchSize) {
                         batchIndex++;
                         log.debug("Executing query {} because batch reached 
max size for {}; fieldIndexes: {}; batch index: {}; batch size: {}",
-                            sql, flowFile, fieldIndexes, batchIndex, 
currentBatchSize);
+                                sql, flowFile, fieldIndexes, batchIndex, 
currentBatchSize);
                         session.adjustCounter("Batches Executed", 1, false);
                         ps.executeBatch();
                         currentBatchSize = 0;
@@ -1081,7 +1113,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
             final RecordFieldType fieldType = 
fieldValue.getField().getDataType().getFieldType();
             if (fieldType != RecordFieldType.RECORD) {
                 throw new ProcessException("RecordPath " + 
dataRecordPath.getPath() + " evaluated against Record expected to return one or 
more Records but encountered field of type" +
-                    " " + fieldType);
+                        " " + fieldType);
             }
         }
 
@@ -1185,18 +1217,18 @@ public class PutDatabaseRecord extends 
AbstractProcessor {
         return tableNameBuilder.toString();
     }
 
-    private Set<String> getNormalizedColumnNames(final RecordSchema schema, 
final boolean translateFieldNames) {
+    private Set<String> getNormalizedColumnNames(final RecordSchema schema, 
final boolean translateFieldNames, NameNormalizer normalizer) {
         final Set<String> normalizedFieldNames = new HashSet<>();
         if (schema != null) {
-            schema.getFieldNames().forEach((fieldName) -> 
normalizedFieldNames.add(ColumnDescription.normalizeColumnName(fieldName, 
translateFieldNames)));
+            schema.getFieldNames().forEach((fieldName) -> 
normalizedFieldNames.add(TableSchema.normalizedName(fieldName, 
translateFieldNames, normalizer)));
         }
         return normalizedFieldNames;
     }
 
-    SqlAndIncludedColumns generateInsert(final RecordSchema recordSchema, 
final String tableName, final TableSchema tableSchema, final DMLSettings 
settings)
+    SqlAndIncludedColumns generateInsert(final RecordSchema recordSchema, 
final String tableName, final TableSchema tableSchema, final DMLSettings 
settings, NameNormalizer normalizer)
             throws IllegalArgumentException, SQLException {
 
-        checkValuesForRequiredColumns(recordSchema, tableSchema, settings);
+        checkValuesForRequiredColumns(recordSchema, tableSchema, settings, 
normalizer);
 
         final StringBuilder sqlBuilder = new StringBuilder();
         sqlBuilder.append("INSERT INTO ");
@@ -1214,7 +1246,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                 RecordField field = recordSchema.getField(i);
                 String fieldName = field.getFieldName();
 
-                final ColumnDescription desc = 
tableSchema.getColumns().get(ColumnDescription.normalizeColumnName(fieldName, 
settings.translateFieldNames));
+                final ColumnDescription desc = 
tableSchema.getColumns().get(TableSchema.normalizedName(fieldName, 
settings.translateFieldNames, normalizer));
                 if (desc == null && !settings.ignoreUnmappedFields) {
                     throw new SQLDataException("Cannot map field '" + 
fieldName + "' to any column in the database\n"
                             + (settings.translateFieldNames ? "Normalized " : 
"") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
@@ -1254,13 +1286,13 @@ public class PutDatabaseRecord extends 
AbstractProcessor {
     }
 
     SqlAndIncludedColumns generateUpsert(final RecordSchema recordSchema, 
final String tableName, final String updateKeys,
-                                         final TableSchema tableSchema, final 
DMLSettings settings)
-        throws IllegalArgumentException, SQLException, 
MalformedRecordException {
+                                         final TableSchema tableSchema, final 
DMLSettings settings, NameNormalizer normalizer)
+            throws IllegalArgumentException, SQLException, 
MalformedRecordException {
 
-        checkValuesForRequiredColumns(recordSchema, tableSchema, settings);
+        checkValuesForRequiredColumns(recordSchema, tableSchema, settings, 
normalizer);
 
         Set<String> keyColumnNames = getUpdateKeyColumnNames(tableName, 
updateKeys, tableSchema);
-        normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, 
settings, keyColumnNames);
+        normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, 
settings, keyColumnNames, normalizer);
 
         List<String> usedColumnNames = new ArrayList<>();
         List<Integer> usedColumnIndices = new ArrayList<>();
@@ -1273,7 +1305,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                 RecordField field = recordSchema.getField(i);
                 String fieldName = field.getFieldName();
 
-                final ColumnDescription desc = 
tableSchema.getColumns().get(ColumnDescription.normalizeColumnName(fieldName, 
settings.translateFieldNames));
+                final ColumnDescription desc = 
tableSchema.getColumns().get(TableSchema.normalizedName(fieldName, 
settings.translateFieldNames, normalizer));
                 if (desc == null && !settings.ignoreUnmappedFields) {
                     throw new SQLDataException("Cannot map field '" + 
fieldName + "' to any column in the database\n"
                             + (settings.translateFieldNames ? "Normalized " : 
"") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
@@ -1308,13 +1340,13 @@ public class PutDatabaseRecord extends 
AbstractProcessor {
     }
 
     SqlAndIncludedColumns generateInsertIgnore(final RecordSchema 
recordSchema, final String tableName, final String updateKeys,
-                                               final TableSchema tableSchema, 
final DMLSettings settings)
+                                               final TableSchema tableSchema, 
final DMLSettings settings, NameNormalizer normalizer)
             throws IllegalArgumentException, SQLException, 
MalformedRecordException {
 
-        checkValuesForRequiredColumns(recordSchema, tableSchema, settings);
+        checkValuesForRequiredColumns(recordSchema, tableSchema, settings, 
normalizer);
 
         Set<String> keyColumnNames = getUpdateKeyColumnNames(tableName, 
updateKeys, tableSchema);
-        normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, 
settings, keyColumnNames);
+        normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, 
settings, keyColumnNames, normalizer);
 
         List<String> usedColumnNames = new ArrayList<>();
         List<Integer> usedColumnIndices = new ArrayList<>();
@@ -1327,7 +1359,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                 RecordField field = recordSchema.getField(i);
                 String fieldName = field.getFieldName();
 
-                final ColumnDescription desc = 
tableSchema.getColumns().get(ColumnDescription.normalizeColumnName(fieldName, 
settings.translateFieldNames));
+                final ColumnDescription desc = 
tableSchema.getColumns().get(TableSchema.normalizedName(fieldName, 
settings.translateFieldNames, normalizer));
                 if (desc == null && !settings.ignoreUnmappedFields) {
                     throw new SQLDataException("Cannot map field '" + 
fieldName + "' to any column in the database\n"
                             + (settings.translateFieldNames ? "Normalized " : 
"") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
@@ -1363,11 +1395,11 @@ public class PutDatabaseRecord extends 
AbstractProcessor {
     }
 
     SqlAndIncludedColumns generateUpdate(final RecordSchema recordSchema, 
final String tableName, final String updateKeys,
-                                         final TableSchema tableSchema, final 
DMLSettings settings)
+                                         final TableSchema tableSchema, final 
DMLSettings settings, NameNormalizer normalizer)
             throws IllegalArgumentException, MalformedRecordException, 
SQLException {
 
         final Set<String> keyColumnNames = getUpdateKeyColumnNames(tableName, 
updateKeys, tableSchema);
-        final Set<String> normalizedKeyColumnNames = 
normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, settings, 
keyColumnNames);
+        final Set<String> normalizedKeyColumnNames = 
normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, settings, 
keyColumnNames, normalizer);
 
         final StringBuilder sqlBuilder = new StringBuilder();
         sqlBuilder.append("UPDATE ");
@@ -1386,8 +1418,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
                 RecordField field = recordSchema.getField(i);
                 String fieldName = field.getFieldName();
 
-                final String normalizedColName = 
ColumnDescription.normalizeColumnName(fieldName, settings.translateFieldNames);
-                final ColumnDescription desc = 
tableSchema.getColumns().get(ColumnDescription.normalizeColumnName(fieldName, 
settings.translateFieldNames));
+                final String normalizedColName = 
TableSchema.normalizedName(fieldName, settings.translateFieldNames, normalizer);
+                final ColumnDescription desc = 
tableSchema.getColumns().get(TableSchema.normalizedName(fieldName, 
settings.translateFieldNames, normalizer));
                 if (desc == null) {
                     if (!settings.ignoreUnmappedFields) {
                         throw new SQLDataException("Cannot map field '" + 
fieldName + "' to any column in the database\n"
@@ -1426,8 +1458,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
                 String fieldName = field.getFieldName();
                 boolean firstUpdateKey = true;
 
-                final String normalizedColName = 
ColumnDescription.normalizeColumnName(fieldName, settings.translateFieldNames);
-                final ColumnDescription desc = 
tableSchema.getColumns().get(ColumnDescription.normalizeColumnName(fieldName, 
settings.translateFieldNames));
+                final String normalizedColName = 
TableSchema.normalizedName(fieldName, settings.translateFieldNames, normalizer);
+                final ColumnDescription desc = 
tableSchema.getColumns().get(TableSchema.normalizedName(fieldName, 
settings.translateFieldNames, normalizer));
                 if (desc != null) {
 
                     // Check if this column is a Update Key. If so, add it to 
the WHERE clause
@@ -1457,12 +1489,13 @@ public class PutDatabaseRecord extends 
AbstractProcessor {
         return new SqlAndIncludedColumns(sqlBuilder.toString(), 
includedColumns);
     }
 
-    SqlAndIncludedColumns generateDelete(final RecordSchema recordSchema, 
final String tableName, String deleteKeys, final TableSchema tableSchema, final 
DMLSettings settings)
+    SqlAndIncludedColumns generateDelete(final RecordSchema recordSchema, 
final String tableName, String deleteKeys, final TableSchema tableSchema,
+                                         final DMLSettings settings, 
NameNormalizer normalizer)
             throws IllegalArgumentException, MalformedRecordException, 
SQLDataException {
 
-        final Set<String> normalizedFieldNames = 
getNormalizedColumnNames(recordSchema, settings.translateFieldNames);
+        final Set<String> normalizedFieldNames = 
getNormalizedColumnNames(recordSchema, settings.translateFieldNames, 
normalizer);
         for (final String requiredColName : 
tableSchema.getRequiredColumnNames()) {
-            final String normalizedColName = 
ColumnDescription.normalizeColumnName(requiredColName, 
settings.translateFieldNames);
+            final String normalizedColName = 
TableSchema.normalizedName(requiredColName, settings.translateFieldNames, 
normalizer);
             if (!normalizedFieldNames.contains(normalizedColName)) {
                 String missingColMessage = "Record does not have a value for 
the Required column '" + requiredColName + "'";
                 if (settings.failUnmappedColumns) {
@@ -1505,7 +1538,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                 RecordField field = recordSchema.getField(i);
                 String fieldName = field.getFieldName();
 
-                final ColumnDescription desc = 
tableSchema.getColumns().get(ColumnDescription.normalizeColumnName(fieldName, 
settings.translateFieldNames));
+                final ColumnDescription desc = 
tableSchema.getColumns().get(TableSchema.normalizedName(fieldName, 
settings.translateFieldNames, normalizer));
                 if (desc == null && !settings.ignoreUnmappedFields) {
                     throw new SQLDataException("Cannot map field '" + 
fieldName + "' to any column in the database\n"
                             + (settings.translateFieldNames ? "Normalized " : 
"") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
@@ -1554,11 +1587,11 @@ public class PutDatabaseRecord extends 
AbstractProcessor {
         return new SqlAndIncludedColumns(sqlBuilder.toString(), 
includedColumns);
     }
 
-    private void checkValuesForRequiredColumns(RecordSchema recordSchema, 
TableSchema tableSchema, DMLSettings settings) {
-        final Set<String> normalizedFieldNames = 
getNormalizedColumnNames(recordSchema, settings.translateFieldNames);
+    private void checkValuesForRequiredColumns(RecordSchema recordSchema, 
TableSchema tableSchema, DMLSettings settings, NameNormalizer normalizer) {
+        final Set<String> normalizedFieldNames = 
getNormalizedColumnNames(recordSchema, settings.translateFieldNames, 
normalizer);
 
         for (final String requiredColName : 
tableSchema.getRequiredColumnNames()) {
-            final String normalizedColName = 
ColumnDescription.normalizeColumnName(requiredColName, 
settings.translateFieldNames);
+            final String normalizedColName = 
TableSchema.normalizedName(requiredColName, settings.translateFieldNames, 
normalizer);
             if (!normalizedFieldNames.contains(normalizedColName)) {
                 String missingColMessage = "Record does not have a value for 
the Required column '" + requiredColName + "'";
                 if (settings.failUnmappedColumns) {
@@ -1590,15 +1623,15 @@ public class PutDatabaseRecord extends 
AbstractProcessor {
         return updateKeyColumnNames;
     }
 
-    private Set<String> normalizeKeyColumnNamesAndCheckForValues(RecordSchema 
recordSchema, String updateKeys, DMLSettings settings, Set<String> 
updateKeyColumnNames)
+    private Set<String> normalizeKeyColumnNamesAndCheckForValues(RecordSchema 
recordSchema, String updateKeys, DMLSettings settings, Set<String> 
updateKeyColumnNames, NameNormalizer normalizer)
             throws MalformedRecordException {
         // Create a Set of all normalized Update Key names, and ensure that 
there is a field in the record
         // for each of the Update Key fields.
-        final Set<String> normalizedRecordFieldNames = 
getNormalizedColumnNames(recordSchema, settings.translateFieldNames);
+        final Set<String> normalizedRecordFieldNames = 
getNormalizedColumnNames(recordSchema, settings.translateFieldNames, 
normalizer);
 
         final Set<String> normalizedKeyColumnNames = new HashSet<>();
         for (final String updateKeyColumnName : updateKeyColumnNames) {
-            String normalizedKeyColumnName = 
ColumnDescription.normalizeColumnName(updateKeyColumnName, 
settings.translateFieldNames);
+            String normalizedKeyColumnName = 
TableSchema.normalizedName(updateKeyColumnName, settings.translateFieldNames, 
normalizer);
 
             if (!normalizedRecordFieldNames.contains(normalizedKeyColumnName)) 
{
                 String missingColMessage = "Record does not have a value for 
the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + 
updateKeyColumnName + "'";
@@ -1651,7 +1684,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
             SchemaKey schemaKey = (SchemaKey) o;
 
             if (catalog != null ? !catalog.equals(schemaKey.catalog) : 
schemaKey.catalog != null) return false;
-            if (schemaName != null ? !schemaName.equals(schemaKey.schemaName) 
: schemaKey.schemaName != null) return false;
+            if (schemaName != null ? !schemaName.equals(schemaKey.schemaName) 
: schemaKey.schemaName != null)
+                return false;
             return tableName.equals(schemaKey.tableName);
         }
     }
@@ -1736,6 +1770,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
 
     static class DMLSettings {
         private final boolean translateFieldNames;
+        private final TranslationStrategy translationStrategy;
+        private final Pattern translationPattern;
         private final boolean ignoreUnmappedFields;
 
         // Is the unmatched column behaviour fail or warning?
@@ -1750,6 +1786,9 @@ public class PutDatabaseRecord extends AbstractProcessor {
 
         DMLSettings(ProcessContext context) {
             translateFieldNames = 
context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean();
+            translationStrategy = 
TranslationStrategy.valueOf(context.getProperty(TRANSLATION_STRATEGY).getValue());
+            final String translationRegex = 
context.getProperty(TRANSLATION_PATTERN).getValue();
+            translationPattern = translationRegex == null ? null : 
Pattern.compile(translationRegex);
             ignoreUnmappedFields = 
IGNORE_UNMATCHED_FIELD.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_FIELD_BEHAVIOR).getValue());
 
             failUnmappedColumns = 
FAIL_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateDatabaseTable.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateDatabaseTable.java
index 239ebede08..00b9d1ba52 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateDatabaseTable.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateDatabaseTable.java
@@ -40,8 +40,11 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processor.util.pattern.DiscontinuedException;
 import org.apache.nifi.processors.standard.db.ColumnDescription;
 import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+import org.apache.nifi.processors.standard.db.NameNormalizer;
+import org.apache.nifi.processors.standard.db.NameNormalizerFactory;
 import org.apache.nifi.processors.standard.db.TableNotFoundException;
 import org.apache.nifi.processors.standard.db.TableSchema;
+import org.apache.nifi.processors.standard.db.TranslationStrategy;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
@@ -71,6 +74,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.ServiceLoader;
 import java.util.Set;
+import java.util.regex.Pattern;
 
 import static 
org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
 
@@ -177,6 +181,27 @@ public class UpdateDatabaseTable extends AbstractProcessor 
{
             .defaultValue("true")
             .build();
 
+    public static final PropertyDescriptor TRANSLATION_STRATEGY = new 
PropertyDescriptor.Builder()
+            .required(true)
+            .name("Column Name Translation Strategy")
+            .description("The strategy used to normalize table column name. 
Column Name will be uppercased to " +
+                    "do case-insensitive matching irrespective of strategy")
+            .allowableValues(TranslationStrategy.class)
+            .defaultValue(TranslationStrategy.REMOVE_UNDERSCORE.getValue())
+            .dependsOn(TRANSLATE_FIELD_NAMES, 
TRANSLATE_FIELD_NAMES.getDefaultValue())
+            .build();
+
+    public static final PropertyDescriptor TRANSLATION_PATTERN = new 
PropertyDescriptor.Builder()
+            .name("Column Name Translation Pattern")
+            .displayName("Column Name Translation Pattern")
+            .description("Column name will be normalized with this regular 
expression")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+            .dependsOn(TRANSLATE_FIELD_NAMES, 
TRANSLATE_FIELD_NAMES.getDefaultValue())
+            .dependsOn(TRANSLATION_STRATEGY, 
TranslationStrategy.PATTERN.getValue())
+            .build();
+
     static final PropertyDescriptor UPDATE_FIELD_NAMES = new 
PropertyDescriptor.Builder()
             .name("updatedatabasetable-update-field-names")
             .displayName("Update Field Names")
@@ -282,6 +307,8 @@ public class UpdateDatabaseTable extends AbstractProcessor {
                 CREATE_TABLE,
                 PRIMARY_KEY_FIELDS,
                 TRANSLATE_FIELD_NAMES,
+                TRANSLATION_STRATEGY,
+                TRANSLATION_PATTERN,
                 UPDATE_FIELD_NAMES,
                 RECORD_WRITER_FACTORY,
                 QUOTE_TABLE_IDENTIFIER,
@@ -371,6 +398,13 @@ public class UpdateDatabaseTable extends AbstractProcessor 
{
             final boolean createIfNotExists = 
context.getProperty(CREATE_TABLE).getValue().equals(CREATE_IF_NOT_EXISTS.getValue());
             final boolean updateFieldNames = 
context.getProperty(UPDATE_FIELD_NAMES).asBoolean();
             final boolean translateFieldNames = 
context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean();
+            final TranslationStrategy translationStrategy = 
TranslationStrategy.valueOf(context.getProperty(TRANSLATION_STRATEGY).getValue());
+            final String translationRegex = 
context.getProperty(TRANSLATION_PATTERN).getValue();
+            final Pattern translationPattern = translationRegex == null ? null 
: Pattern.compile(translationRegex);
+            NameNormalizer normalizer = null;
+            if (translateFieldNames) {
+                normalizer = 
NameNormalizerFactory.getNormalizer(translationStrategy, translationPattern);
+            }
             if (recordWriterFactory == null && updateFieldNames) {
                 throw new ProcessException("Record Writer must be set if 
'Update Field Names' is true");
             }
@@ -393,7 +427,8 @@ public class UpdateDatabaseTable extends AbstractProcessor {
                     primaryKeyColumnNames = null;
                 }
                 final OutputMetadataHolder outputMetadataHolder = 
checkAndUpdateTableSchema(connection, databaseAdapter, recordSchema,
-                        catalogName, schemaName, tableName, createIfNotExists, 
translateFieldNames, updateFieldNames, primaryKeyColumnNames, quoteTableName, 
quoteColumnNames);
+                        catalogName, schemaName, tableName, createIfNotExists, 
translateFieldNames, normalizer,
+                        updateFieldNames, primaryKeyColumnNames, 
quoteTableName, quoteColumnNames);
                 if (outputMetadataHolder != null) {
                     // The output schema changed (i.e. field names were 
updated), so write out the corresponding FlowFile
                     try {
@@ -457,15 +492,16 @@ public class UpdateDatabaseTable extends 
AbstractProcessor {
 
     private synchronized OutputMetadataHolder checkAndUpdateTableSchema(final 
Connection conn, final DatabaseAdapter databaseAdapter, final RecordSchema 
schema,
                                                                         final 
String catalogName, final String schemaName, final String tableName,
-                                                                        final 
boolean createIfNotExists, final boolean translateFieldNames, final boolean 
updateFieldNames,
-                                                                        final 
Set<String> primaryKeyColumnNames, final boolean quoteTableName, final boolean 
quoteColumnNames) throws IOException {
+                                                                        final 
boolean createIfNotExists, final boolean translateFieldNames, final 
NameNormalizer normalizer,
+                                                                        final 
boolean updateFieldNames, final Set<String> primaryKeyColumnNames, final 
boolean quoteTableName,
+                                                                        final 
boolean quoteColumnNames) throws IOException {
         // Read in the current table metadata, compare it to the reader's 
schema, and
         // add any columns from the schema that are missing in the table
         try (final Statement s = conn.createStatement()) {
             // Determine whether the table exists
             TableSchema tableSchema = null;
             try {
-                tableSchema = TableSchema.from(conn, catalogName, schemaName, 
tableName, translateFieldNames, null, getLogger());
+                tableSchema = TableSchema.from(conn, catalogName, schemaName, 
tableName, translateFieldNames, normalizer, null, getLogger());
             } catch (TableNotFoundException tnfe) {
                 // Do nothing, the value will be populated if necessary
             }
@@ -483,7 +519,7 @@ public class UpdateDatabaseTable extends AbstractProcessor {
                         getLogger().debug("Adding column {} to table {}", 
recordFieldName, tableName);
                     }
 
-                    tableSchema = new TableSchema(catalogName, schemaName, 
tableName, columns, translateFieldNames, primaryKeyColumnNames, 
databaseAdapter.getColumnQuoteString());
+                    tableSchema = new TableSchema(catalogName, schemaName, 
tableName, columns, translateFieldNames, normalizer, primaryKeyColumnNames, 
databaseAdapter.getColumnQuoteString());
 
                     final String createTableSql = 
databaseAdapter.getCreateTableStatement(tableSchema, quoteTableName, 
quoteColumnNames);
 
@@ -502,7 +538,7 @@ public class UpdateDatabaseTable extends AbstractProcessor {
 
             final List<String> dbColumns = new ArrayList<>();
             for (final ColumnDescription columnDescription : 
tableSchema.getColumnsAsList()) {
-                
dbColumns.add(ColumnDescription.normalizeColumnName(columnDescription.getColumnName(),
 translateFieldNames));
+                
dbColumns.add(TableSchema.normalizedName(columnDescription.getColumnName(), 
translateFieldNames, normalizer));
             }
 
             final List<ColumnDescription> columnsToAdd = new ArrayList<>();
@@ -511,7 +547,7 @@ public class UpdateDatabaseTable extends AbstractProcessor {
                 // Handle new columns
                 for (RecordField recordField : schema.getFields()) {
                     final String recordFieldName = recordField.getFieldName();
-                    final String normalizedFieldName = 
ColumnDescription.normalizeColumnName(recordFieldName, translateFieldNames);
+                    final String normalizedFieldName = 
TableSchema.normalizedName(recordFieldName, translateFieldNames, normalizer);
                     if (!dbColumns.contains(normalizedFieldName)) {
                         // The field does not exist in the table, add it
                         ColumnDescription columnToAdd = new 
ColumnDescription(recordFieldName, 
DataTypeUtils.getSQLTypeValue(recordField.getDataType()),
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/NameNormalizer.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/NameNormalizer.java
new file mode 100644
index 0000000000..42c41fab6d
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/NameNormalizer.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.db;
+
+public interface NameNormalizer {
+
+    String getNormalizedName(String colName);
+
+}
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/NameNormalizerFactory.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/NameNormalizerFactory.java
new file mode 100644
index 0000000000..1e84c34375
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/NameNormalizerFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.db;
+
+import org.apache.nifi.processors.standard.db.impl.PatternNormalizer;
+import 
org.apache.nifi.processors.standard.db.impl.RemoveAllSpecialCharNormalizer;
+import org.apache.nifi.processors.standard.db.impl.RemoveSpaceNormalizer;
+import 
org.apache.nifi.processors.standard.db.impl.RemoveUnderscoreAndSpaceNormalizer;
+import org.apache.nifi.processors.standard.db.impl.RemoveUnderscoreNormalizer;
+
+import java.util.regex.Pattern;
+
+public class NameNormalizerFactory {
+    public static NameNormalizer getNormalizer(TranslationStrategy strategy, 
Pattern regex) {
+
+        return switch (strategy) {
+            case REMOVE_UNDERSCORE -> new RemoveUnderscoreNormalizer();
+            case REMOVE_SPACE -> new RemoveSpaceNormalizer();
+            case REMOVE_UNDERSCORE_AND_SPACE -> new 
RemoveUnderscoreAndSpaceNormalizer();
+            case REMOVE_ALL_SPECIAL_CHAR -> new 
RemoveAllSpecialCharNormalizer();
+            case PATTERN -> new PatternNormalizer(regex);
+        };
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TableSchema.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TableSchema.java
index 88e81e16be..dc7d3f7db2 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TableSchema.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TableSchema.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+
 public class TableSchema {
     private final List<String> requiredColumnNames;
     private final Set<String> primaryKeyColumnNames;
@@ -38,18 +39,20 @@ public class TableSchema {
     private final String schemaName;
     private final String tableName;
 
-    public TableSchema(final String catalogName, final String schemaName, 
final String tableName, final List<ColumnDescription> columnDescriptions, final 
boolean translateColumnNames,
-                        final Set<String> primaryKeyColumnNames, final String 
quotedIdentifierString) {
+    public TableSchema(final String catalogName, final String schemaName, 
final String tableName,
+                       final List<ColumnDescription> columnDescriptions, final 
boolean translateColumnNames,
+                       final NameNormalizer normalizer,
+                       final Set<String> primaryKeyColumnNames, final String 
quotedIdentifierString) {
         this.catalogName = catalogName;
         this.schemaName = schemaName;
         this.tableName = tableName;
         this.columns = new LinkedHashMap<>();
         this.primaryKeyColumnNames = primaryKeyColumnNames;
         this.quotedIdentifierString = quotedIdentifierString;
-
         this.requiredColumnNames = new ArrayList<>();
         for (final ColumnDescription desc : columnDescriptions) {
-            
columns.put(ColumnDescription.normalizeColumnName(desc.getColumnName(), 
translateColumnNames), desc);
+            final String colName = normalizedName(desc.getColumnName(), 
translateColumnNames, normalizer);
+            columns.put(colName, desc);
             if (desc.isRequired()) {
                 requiredColumnNames.add(desc.getColumnName());
             }
@@ -89,7 +92,8 @@ public class TableSchema {
     }
 
     public static TableSchema from(final Connection conn, final String 
catalog, final String schema, final String tableName,
-                                   final boolean translateColumnNames, final 
String updateKeys, ComponentLog log) throws SQLException {
+                                   final boolean translateColumnNames, final 
NameNormalizer normalizer,
+                                   final String updateKeys, ComponentLog log) 
throws SQLException {
         final DatabaseMetaData dmd = conn.getMetaData();
 
         try (final ResultSet colrs = dmd.getColumns(catalog, schema, 
tableName, "%")) {
@@ -136,12 +140,21 @@ public class TableSchema {
             } else {
                 // Parse the Update Keys field and normalize the column names
                 for (final String updateKey : updateKeys.split(",")) {
-                    
primaryKeyColumns.add(ColumnDescription.normalizeColumnName(updateKey.trim(), 
translateColumnNames));
+                    final String colName = normalizedName(updateKey, 
translateColumnNames, normalizer);
+                    primaryKeyColumns.add(colName);
+
                 }
             }
 
-            return new TableSchema(catalog, schema, tableName, cols, 
translateColumnNames, primaryKeyColumns, dmd.getIdentifierQuoteString());
+            return new TableSchema(catalog, schema, tableName, cols, 
translateColumnNames, normalizer, primaryKeyColumns, 
dmd.getIdentifierQuoteString());
+        }
+    }
+
+    public static String normalizedName(final String name, final boolean 
translateNames, final NameNormalizer normalizer) {
+        if (translateNames && normalizer != null) {
+            return normalizer.getNormalizedName(name).trim().toUpperCase();
         }
+        return name;
     }
 
     @Override
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TranslationStrategy.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TranslationStrategy.java
new file mode 100644
index 0000000000..7e3470a78a
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TranslationStrategy.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.db;
+
+import org.apache.nifi.components.DescribedValue;
+
+/**
+ * Enumeration of supported Database column name Translation Strategies
+ */
+public enum TranslationStrategy implements DescribedValue {
+    REMOVE_UNDERSCORE("Remove Underscore",
+            "Underscores '_' will be removed from column names Ex: 'Pics_1_23' 
becomes 'PICS123'"),
+    REMOVE_SPACE("Remove Space",
+            "Spaces will be removed from column names Ex. 'User Name' becomes 
'USERNAME'"),
+    REMOVE_UNDERSCORE_AND_SPACE("Remove Underscores and Spaces",
+            "Spaces and Underscores will be removed from column names Ex. 
'User_1 Name' becomes 'USER1NAME'"),
+    REMOVE_ALL_SPECIAL_CHAR("Remove Regular Expression Characters",
+            "Remove Regular Expression Characters " +
+                    "Ex. 'user-id' becomes USERID ,total(estimated) become 
TOTALESTIMATED"),
+    PATTERN("Regular Expression",
+            "Remove characters matching this Regular Expression from the 
column names Ex." +
+                    "1. '\\d' will  Remove all numbers " +
+                    "2. '[^a-zA-Z0-9_]' will remove special characters except 
underscore");
+    private final String displayName;
+    private final String description;
+
+    TranslationStrategy(String displayName, String description) {
+        this.displayName = displayName;
+        this.description = description;
+    }
+
+    @Override
+    public String getValue() {
+        return name();
+    }
+
+    @Override
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PatternNormalizer.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PatternNormalizer.java
new file mode 100644
index 0000000000..72f01dee5d
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PatternNormalizer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.db.impl;
+
+import org.apache.nifi.processors.standard.db.NameNormalizer;
+
+import java.util.regex.Pattern;
+
+public class PatternNormalizer implements NameNormalizer {
+    private final Pattern pattern;
+
+    public PatternNormalizer(Pattern pattern) {
+        this.pattern = pattern;
+    }
+
+    @Override
+    public String getNormalizedName(String colName) {
+
+        if (pattern == null) {
+            return colName;
+        } else {
+            return pattern.matcher(colName).replaceAll("");
+        }
+    }
+
+}
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveAllSpecialCharNormalizer.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveAllSpecialCharNormalizer.java
new file mode 100644
index 0000000000..3d9d37700d
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveAllSpecialCharNormalizer.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.db.impl;
+
+import org.apache.nifi.processors.standard.db.NameNormalizer;
+
+import java.util.regex.Pattern;
+
+public class RemoveAllSpecialCharNormalizer implements NameNormalizer {
+    private static final Pattern REMOVE_ALL_SPECIAL_CHAR_REGEX = 
Pattern.compile("[^a-zA-Z0-9]");
+    @Override
+    public String getNormalizedName(String colName) {
+        return REMOVE_ALL_SPECIAL_CHAR_REGEX.matcher(colName).replaceAll("");
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveSpaceNormalizer.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveSpaceNormalizer.java
new file mode 100644
index 0000000000..9f20b7ee8d
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveSpaceNormalizer.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.db.impl;
+
+import org.apache.nifi.processors.standard.db.NameNormalizer;
+
+public class RemoveSpaceNormalizer implements NameNormalizer {
+
+    @Override
+    public String getNormalizedName(String colName) {
+        return colName.replace(" ", "");
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveUnderscoreAndSpaceNormalizer.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveUnderscoreAndSpaceNormalizer.java
new file mode 100644
index 0000000000..454684c07f
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveUnderscoreAndSpaceNormalizer.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.db.impl;
+
+import org.apache.nifi.processors.standard.db.NameNormalizer;
+
+public class RemoveUnderscoreAndSpaceNormalizer implements NameNormalizer {
+
+
+    @Override
+    public String getNormalizedName(String colName) {
+        return colName.replace("_", "").replace(" ", "");
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveUnderscoreNormalizer.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveUnderscoreNormalizer.java
new file mode 100644
index 0000000000..410007ce2d
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveUnderscoreNormalizer.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.db.impl;
+
+import org.apache.nifi.processors.standard.db.NameNormalizer;
+
+public class RemoveUnderscoreNormalizer implements NameNormalizer {
+
+
+    @Override
+    public String getNormalizedName(String colName) {
+        return  colName.replace("_", "");
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
index 307c230ced..6b85cc3f74 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
@@ -22,6 +22,7 @@ import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
 import org.apache.nifi.processors.standard.db.ColumnDescription;
+import org.apache.nifi.processors.standard.db.NameNormalizer;
 import org.apache.nifi.processors.standard.db.TableSchema;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.MalformedRecordException;
@@ -388,7 +389,7 @@ public class PutDatabaseRecordTest {
                         new ColumnDescription("name", 12, true, 255, true),
                         new ColumnDescription("code", 4, true, 10, true)
                 ),
-                false,
+                false, null,
                 new HashSet<>(Arrays.asList("id")),
                 ""
         );
@@ -401,13 +402,13 @@ public class PutDatabaseRecordTest {
         final PutDatabaseRecord.DMLSettings settings = new 
PutDatabaseRecord.DMLSettings(runner.getProcessContext());
 
         assertEquals("INSERT INTO PERSONS (id, name, code) VALUES (?,?,?)",
-                processor.generateInsert(schema, "PERSONS", tableSchema, 
settings).getSql());
+                processor.generateInsert(schema, "PERSONS", tableSchema, 
settings, null).getSql());
         assertEquals("UPDATE PERSONS SET name = ?, code = ? WHERE id = ?",
-                processor.generateUpdate(schema, "PERSONS", null, tableSchema, 
settings).getSql());
+                processor.generateUpdate(schema, "PERSONS", null, tableSchema, 
settings, null).getSql());
         assertEquals("DELETE FROM PERSONS WHERE (id = ?) AND (name = ? OR 
(name is null AND ? is null)) AND (code = ? OR (code is null AND ? is null))",
-                processor.generateDelete(schema, "PERSONS", null, tableSchema, 
settings).getSql());
+                processor.generateDelete(schema, "PERSONS", null, tableSchema, 
settings, null).getSql());
         assertEquals("DELETE FROM PERSONS WHERE (id = ?) AND (code = ? OR 
(code is null AND ? is null))",
-                processor.generateDelete(schema, "PERSONS", "id, code", 
tableSchema, settings).getSql());
+                processor.generateDelete(schema, "PERSONS", "id, code", 
tableSchema, settings, null).getSql());
     }
 
     @Test
@@ -429,7 +430,7 @@ public class PutDatabaseRecordTest {
                         new ColumnDescription("name", 12, true, 255, true),
                         new ColumnDescription("code", 4, true, 10, true)
                 ),
-                false,
+                false, null,
                 new HashSet<>(Arrays.asList("id")),
                 ""
         );
@@ -442,17 +443,17 @@ public class PutDatabaseRecordTest {
         final PutDatabaseRecord.DMLSettings settings = new 
PutDatabaseRecord.DMLSettings(runner.getProcessContext());
 
         SQLDataException e = assertThrows(SQLDataException.class,
-                () -> processor.generateInsert(schema, "PERSONS", tableSchema, 
settings),
+                () -> processor.generateInsert(schema, "PERSONS", tableSchema, 
settings, null),
                 "generateInsert should fail with unmatched fields");
         assertEquals("Cannot map field 'non_existing' to any column in the 
database\nColumns: id,name,code", e.getMessage());
 
         e = assertThrows(SQLDataException.class,
-                () -> processor.generateUpdate(schema, "PERSONS", null, 
tableSchema, settings),
+                () -> processor.generateUpdate(schema, "PERSONS", null, 
tableSchema, settings, null),
                 "generateUpdate should fail with unmatched fields");
         assertEquals("Cannot map field 'non_existing' to any column in the 
database\nColumns: id,name,code", e.getMessage());
 
         e = assertThrows(SQLDataException.class,
-                () -> processor.generateDelete(schema, "PERSONS", null, 
tableSchema, settings),
+                () -> processor.generateDelete(schema, "PERSONS", null, 
tableSchema, settings, null),
                 "generateDelete should fail with unmatched fields");
         assertEquals("Cannot map field 'non_existing' to any column in the 
database\nColumns: id,name,code", e.getMessage());
     }
@@ -725,7 +726,7 @@ public class PutDatabaseRecordTest {
     public void testInsertViaSqlTypeOneStatement(TestCase testCase) throws 
InitializationException, ProcessException, SQLException {
         setRunner(testCase);
 
-        String[] sqlStatements = new String[] {
+        String[] sqlStatements = new String[]{
                 "INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)"
         };
         testInsertViaSqlTypeStatements(sqlStatements, false);
@@ -736,7 +737,7 @@ public class PutDatabaseRecordTest {
     public void testInsertViaSqlTypeTwoStatementsSemicolon(TestCase testCase) 
throws InitializationException, ProcessException, SQLException {
         setRunner(testCase);
 
-        String[] sqlStatements = new String[] {
+        String[] sqlStatements = new String[]{
                 "INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)",
                 "INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102);"
         };
@@ -748,7 +749,7 @@ public class PutDatabaseRecordTest {
     public void testInsertViaSqlTypeThreeStatements(TestCase testCase) throws 
InitializationException, ProcessException, SQLException {
         setRunner(testCase);
 
-        String[] sqlStatements = new String[] {
+        String[] sqlStatements = new String[]{
                 "INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)",
                 "INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102)",
                 "UPDATE PERSONS SET code = 101 WHERE id = 1"
@@ -824,7 +825,7 @@ public class PutDatabaseRecordTest {
     public void testMultipleInsertsForOneStatementViaSqlStatementType(TestCase 
testCase) throws InitializationException, ProcessException, SQLException {
         setRunner(testCase);
 
-        String[] sqlStatements = new String[] {
+        String[] sqlStatements = new String[]{
                 "INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)"
         };
         testMultipleStatementsViaSqlStatementType(sqlStatements);
@@ -835,7 +836,7 @@ public class PutDatabaseRecordTest {
     public void 
testMultipleInsertsForTwoStatementsViaSqlStatementType(TestCase testCase) 
throws InitializationException, ProcessException, SQLException {
         setRunner(testCase);
 
-        String[] sqlStatements = new String[] {
+        String[] sqlStatements = new String[]{
                 "INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)",
                 "INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102);"
         };
@@ -1731,7 +1732,7 @@ public class PutDatabaseRecordTest {
                         new ColumnDescription("name", 12, true, 255, true),
                         new ColumnDescription("code", 4, true, 10, true)
                 ),
-                false,
+                false, null,
                 new HashSet<>(Arrays.asList("id")),
                 ""
         );
@@ -2035,7 +2036,7 @@ public class PutDatabaseRecordTest {
         assertEquals(1, resultSet.getInt(1));
 
         Blob blob = resultSet.getBlob(2);
-        assertArrayEquals(new byte[] {(byte) 171, (byte) 205, (byte) 239}, 
blob.getBytes(1, (int) blob.length()));
+        assertArrayEquals(new byte[]{(byte) 171, (byte) 205, (byte) 239}, 
blob.getBytes(1, (int) blob.length()));
 
         stmt.close();
         conn.close();
@@ -2201,7 +2202,7 @@ public class PutDatabaseRecordTest {
         parser.addSchemaField("code", RecordFieldType.INT);
         parser.addSchemaField("content", 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType()).getFieldType());
 
-        parser.addRecord(1, "rec1", 101, new Integer[] {1, 2, 3});
+        parser.addRecord(1, "rec1", 101, new Integer[]{1, 2, 3});
 
         runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
         runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, 
PutDatabaseRecord.INSERT_TYPE);
@@ -2320,8 +2321,8 @@ public class PutDatabaseRecordTest {
         parser.addSchemaField("id", RecordFieldType.INT);
         parser.addSchemaField("name", 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()).getFieldType());
 
-        byte[] longVarBinaryValue1 = new byte[] {97, 98, 99};
-        byte[] longVarBinaryValue2 = new byte[] {100, 101, 102};
+        byte[] longVarBinaryValue1 = new byte[]{97, 98, 99};
+        byte[] longVarBinaryValue2 = new byte[]{100, 101, 102};
         parser.addRecord(1, longVarBinaryValue1);
         parser.addRecord(2, longVarBinaryValue2);
 
@@ -2421,7 +2422,7 @@ public class PutDatabaseRecordTest {
 
     static class PutDatabaseRecordUnmatchedField extends PutDatabaseRecord {
         @Override
-        SqlAndIncludedColumns generateInsert(RecordSchema recordSchema, String 
tableName, TableSchema tableSchema, DMLSettings settings) throws 
IllegalArgumentException {
+        SqlAndIncludedColumns generateInsert(RecordSchema recordSchema, String 
tableName, TableSchema tableSchema, DMLSettings settings, NameNormalizer 
normalizer) throws IllegalArgumentException {
             return new SqlAndIncludedColumns("INSERT INTO PERSONS VALUES 
(?,?,?,?)", Arrays.asList(0, 1, 2, 3));
         }
     }
@@ -2459,7 +2460,7 @@ public class PutDatabaseRecordTest {
         @Override
         public Connection getConnection() throws ProcessException {
             try {
-                Connection spyConnection =  
spy(DriverManager.getConnection("jdbc:derby:" + databaseLocation + 
";create=true"));
+                Connection spyConnection = 
spy(DriverManager.getConnection("jdbc:derby:" + databaseLocation + 
";create=true"));
                 
doThrow(SQLFeatureNotSupportedException.class).when(spyConnection).setAutoCommit(false);
                 return spyConnection;
             } catch (final Exception e) {
@@ -2474,6 +2475,7 @@ public class PutDatabaseRecordTest {
             this.rollbackOnFailure = rollbackOnFailure;
             this.batchSize = batchSize;
         }
+
         private Boolean autoCommit = null;
         private Boolean rollbackOnFailure = null;
         private Integer batchSize = null;
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java
index 88052b2673..1409f253a0 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java
@@ -23,8 +23,11 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.nifi.processors.standard.db.ColumnDescription;
+import org.apache.nifi.processors.standard.db.NameNormalizer;
+import org.apache.nifi.processors.standard.db.NameNormalizerFactory;
 import org.apache.nifi.processors.standard.db.DatabaseAdapter;
 import org.apache.nifi.processors.standard.db.TableSchema;
+import org.apache.nifi.processors.standard.db.TranslationStrategy;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -144,14 +147,15 @@ public class TestOracle12DatabaseAdapter {
         Collection<String> uniqueKeyColumnNames = Arrays.asList("column1", 
"column4");
 
         String expected = "MERGE INTO table USING (SELECT ? column1, ? 
column2, ? column3, ? column_4 FROM DUAL) n" +
-        " ON (table.column1 = n.column1 AND table.column_4 = n.column_4) WHEN 
NOT MATCHED THEN" +
-        " INSERT (column1, column2, column3, column_4) VALUES (n.column1, 
n.column2, n.column3, n.column_4)" +
-        " WHEN MATCHED THEN UPDATE SET table.column2 = n.column2, 
table.column3 = n.column3";
+                " ON (table.column1 = n.column1 AND table.column_4 = 
n.column_4) WHEN NOT MATCHED THEN" +
+                " INSERT (column1, column2, column3, column_4) VALUES 
(n.column1, n.column2, n.column3, n.column_4)" +
+                " WHEN MATCHED THEN UPDATE SET table.column2 = n.column2, 
table.column3 = n.column3";
 
         // WHEN
         // THEN
         testGetUpsertStatement(tableName, columnNames, uniqueKeyColumnNames, 
expected);
     }
+
     @Test
     public void testGetCreateTableStatement() {
         assertTrue(db.supportsCreateTableIfNotExists());
@@ -159,7 +163,9 @@ public class TestOracle12DatabaseAdapter {
                 new ColumnDescription("col1", Types.INTEGER, true, 4, false),
                 new ColumnDescription("col2", Types.VARCHAR, false, 2000, true)
         );
-        TableSchema tableSchema = new TableSchema("USERS", null, "TEST_TABLE", 
columns, true, Collections.singleton("COL1"), db.getColumnQuoteString());
+        NameNormalizer normalizer = 
NameNormalizerFactory.getNormalizer(TranslationStrategy.REMOVE_UNDERSCORE, 
null);
+        TableSchema tableSchema = new TableSchema("USERS", null, "TEST_TABLE", 
columns,
+                true, normalizer, Collections.singleton("COL1"), 
db.getColumnQuoteString());
 
         String expectedStatement = "DECLARE\n\tsql_stmt 
long;\nBEGIN\n\tsql_stmt:='CREATE TABLE "
                 // Strings are returned as VARCHAR2(2000) regardless of 
reported size and that VARCHAR2 is not in java.sql.Types
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracleDatabaseAdapter.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracleDatabaseAdapter.java
index 2e9f6b6171..99b8e3e3bd 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracleDatabaseAdapter.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracleDatabaseAdapter.java
@@ -17,8 +17,11 @@
 package org.apache.nifi.processors.standard.db.impl;
 
 import org.apache.nifi.processors.standard.db.ColumnDescription;
+import org.apache.nifi.processors.standard.db.NameNormalizer;
+import org.apache.nifi.processors.standard.db.NameNormalizerFactory;
 import org.apache.nifi.processors.standard.db.DatabaseAdapter;
 import org.apache.nifi.processors.standard.db.TableSchema;
+import org.apache.nifi.processors.standard.db.TranslationStrategy;
 import org.junit.jupiter.api.Test;
 
 import java.sql.Types;
@@ -118,7 +121,9 @@ public class TestOracleDatabaseAdapter {
                 new ColumnDescription("col1", Types.INTEGER, true, 4, false),
                 new ColumnDescription("col2", Types.VARCHAR, false, 2000, true)
         );
-        TableSchema tableSchema = new TableSchema("USERS", null, "TEST_TABLE", 
columns, true, Collections.singleton("COL1"), db.getColumnQuoteString());
+        NameNormalizer normalizer = 
NameNormalizerFactory.getNormalizer(TranslationStrategy.REMOVE_UNDERSCORE, 
null);
+        TableSchema tableSchema = new TableSchema("USERS", null, "TEST_TABLE", 
columns,
+                true, normalizer, Collections.singleton("COL1"), 
db.getColumnQuoteString());
 
         String expectedStatement = "DECLARE\n\tsql_stmt 
long;\nBEGIN\n\tsql_stmt:='CREATE TABLE "
                 // Strings are returned as VARCHAR2(2000) regardless of 
reported size and that VARCHAR2 is not in java.sql.Types

Reply via email to