This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 502572b2f5 NIFI-11858 Configurable Column Name Normalization in
PutDatabaseRecord and UpdateDatabaseTable
502572b2f5 is described below
commit 502572b2f5911685a5baf8ce70a4c8f5f90b668b
Author: ravisingh <[email protected]>
AuthorDate: Fri Oct 11 17:38:00 2024 -0700
NIFI-11858 Configurable Column Name Normalization in PutDatabaseRecord and
UpdateDatabaseTable
cleaned and required changes for https://github.com/apache/nifi/pull/8995
updated the description to reflect uppercase conversion of column name
uppercased to do case-insensitive matching irrespective of strategy
added example for REMOVE_ALL_SPECIAL_CHAR and PATTERN
Signed-off-by: Matt Burgess <[email protected]>
This closes #9382
---
.../processors/standard/PutDatabaseRecord.java | 207 ++++++++++++---------
.../processors/standard/UpdateDatabaseTable.java | 50 ++++-
.../processors/standard/db/NameNormalizer.java | 24 +++
.../standard/db/NameNormalizerFactory.java | 38 ++++
.../nifi/processors/standard/db/TableSchema.java | 27 ++-
.../standard/db/TranslationStrategy.java | 60 ++++++
.../standard/db/impl/PatternNormalizer.java | 41 ++++
.../db/impl/RemoveAllSpecialCharNormalizer.java | 29 +++
.../standard/db/impl/RemoveSpaceNormalizer.java | 27 +++
.../impl/RemoveUnderscoreAndSpaceNormalizer.java | 28 +++
.../db/impl/RemoveUnderscoreNormalizer.java | 28 +++
.../processors/standard/PutDatabaseRecordTest.java | 44 ++---
.../db/impl/TestOracle12DatabaseAdapter.java | 14 +-
.../db/impl/TestOracleDatabaseAdapter.java | 7 +-
14 files changed, 500 insertions(+), 124 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index a3cfd0c375..19192e57df 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -23,6 +23,7 @@ import static
org.apache.nifi.expression.ExpressionLanguageScope.NONE;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
+
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -47,12 +48,15 @@ import java.util.HashSet;
import java.util.HexFormat;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -79,7 +83,10 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
import org.apache.nifi.processors.standard.db.ColumnDescription;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+import org.apache.nifi.processors.standard.db.NameNormalizer;
+import org.apache.nifi.processors.standard.db.NameNormalizerFactory;
import org.apache.nifi.processors.standard.db.TableSchema;
+import org.apache.nifi.processors.standard.db.TranslationStrategy;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.RecordPathResult;
@@ -184,25 +191,26 @@ public class PutDatabaseRecord extends AbstractProcessor {
.build();
static final PropertyDescriptor STATEMENT_TYPE_RECORD_PATH = new Builder()
- .name("Statement Type Record Path")
- .displayName("Statement Type Record Path")
- .description("Specifies a RecordPath to evaluate against each Record
in order to determine the Statement Type. The RecordPath should equate to
either INSERT, UPDATE, UPSERT, or DELETE. "
- + "(Debezium style operation types are also supported: \"r\"
and \"c\" for INSERT, \"u\" for UPDATE, and \"d\" for DELETE)")
- .required(true)
- .addValidator(new RecordPathValidator())
- .expressionLanguageSupported(NONE)
- .dependsOn(STATEMENT_TYPE, USE_RECORD_PATH)
- .build();
+ .name("Statement Type Record Path")
+ .displayName("Statement Type Record Path")
+ .description("Specifies a RecordPath to evaluate against each
Record in order to determine the Statement Type. The RecordPath should equate
to either INSERT, UPDATE, UPSERT, or DELETE. "
+ + "(Debezium style operation types are also supported:
\"r\" and \"c\" for INSERT, \"u\" for UPDATE, and \"d\" for DELETE)")
+ .required(true)
+ .addValidator(new RecordPathValidator())
+ .expressionLanguageSupported(NONE)
+ .dependsOn(STATEMENT_TYPE, USE_RECORD_PATH)
+ .build();
static final PropertyDescriptor DATA_RECORD_PATH = new Builder()
- .name("Data Record Path")
- .displayName("Data Record Path")
- .description("If specified, this property denotes a RecordPath that
will be evaluated against each incoming Record and the Record that results from
evaluating the RecordPath will be sent to" +
- " the database instead of sending the entire incoming Record. If
not specified, the entire incoming Record will be published to the database.")
- .required(false)
- .addValidator(new RecordPathValidator())
- .expressionLanguageSupported(NONE)
- .build();
+ .name("Data Record Path")
+ .displayName("Data Record Path")
+ .description("If specified, this property denotes a RecordPath
that will be evaluated against each incoming" +
+ " Record and the Record that results from evaluating the
RecordPath will be sent to" +
+ " the database instead of sending the entire incoming
Record. If not specified, the entire incoming Record will be published to the
database.")
+ .required(false)
+ .addValidator(new RecordPathValidator())
+ .expressionLanguageSupported(NONE)
+ .build();
static final PropertyDescriptor DBCP_SERVICE = new Builder()
.name("put-db-record-dcbp-service")
@@ -277,6 +285,25 @@ public class PutDatabaseRecord extends AbstractProcessor {
.allowableValues("true", "false")
.defaultValue("true")
.build();
+ public static final PropertyDescriptor TRANSLATION_STRATEGY = new
PropertyDescriptor.Builder()
+ .required(true)
+ .name("Column Name Translation Strategy")
+ .description("The strategy used to normalize table column name.
Column Name will be uppercased to " +
+ "do case-insensitive matching irrespective of strategy")
+ .allowableValues(TranslationStrategy.class)
+ .defaultValue(TranslationStrategy.REMOVE_UNDERSCORE.getValue())
+ .dependsOn(TRANSLATE_FIELD_NAMES,
TRANSLATE_FIELD_NAMES.getDefaultValue())
+ .build();
+
+ public static final PropertyDescriptor TRANSLATION_PATTERN = new
PropertyDescriptor.Builder()
+ .required(true)
+ .name("Column Name Translation Pattern")
+ .description("Column name will be normalized with this regular
expression")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+ .dependsOn(TRANSLATE_FIELD_NAMES,
TRANSLATE_FIELD_NAMES.getDefaultValue())
+ .dependsOn(TRANSLATION_STRATEGY,
TranslationStrategy.PATTERN.getValue())
+ .build();
static final PropertyDescriptor UNMATCHED_FIELD_BEHAVIOR = new Builder()
.name("put-db-record-unmatched-field-behavior")
@@ -415,14 +442,14 @@ public class PutDatabaseRecord extends AbstractProcessor {
});
DB_TYPE = new Builder()
- .name("db-type")
- .displayName("Database Type")
- .description("The type/flavor of database, used for generating
database-specific code. In many cases the Generic type "
- + "should suffice, but some databases (such as Oracle) require
custom SQL clauses. ")
- .allowableValues(dbAdapterValues.toArray(new AllowableValue[0]))
- .defaultValue("Generic")
- .required(false)
- .build();
+ .name("db-type")
+ .displayName("Database Type")
+ .description("The type/flavor of database, used for generating
database-specific code. In many cases the Generic type "
+ + "should suffice, but some databases (such as Oracle)
require custom SQL clauses. ")
+ .allowableValues(dbAdapterValues.toArray(new
AllowableValue[0]))
+ .defaultValue("Generic")
+ .required(false)
+ .build();
properties = List.of(
RECORD_READER_FACTORY,
@@ -436,6 +463,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
TABLE_NAME,
BINARY_STRING_FORMAT,
TRANSLATE_FIELD_NAMES,
+ TRANSLATION_STRATEGY,
+ TRANSLATION_PATTERN,
UNMATCHED_FIELD_BEHAVIOR,
UNMATCHED_COLUMN_BEHAVIOR,
UPDATE_KEYS,
@@ -473,12 +502,12 @@ public class PutDatabaseRecord extends AbstractProcessor {
DatabaseAdapter databaseAdapter =
dbAdapters.get(validationContext.getProperty(DB_TYPE).getValue());
String statementType =
validationContext.getProperty(STATEMENT_TYPE).getValue();
if ((UPSERT_TYPE.equals(statementType) &&
!databaseAdapter.supportsUpsert())
- || (INSERT_IGNORE_TYPE.equals(statementType) &&
!databaseAdapter.supportsInsertIgnore())) {
+ || (INSERT_IGNORE_TYPE.equals(statementType) &&
!databaseAdapter.supportsInsertIgnore())) {
validationResults.add(new ValidationResult.Builder()
- .subject(STATEMENT_TYPE.getDisplayName())
- .valid(false)
- .explanation(databaseAdapter.getName() + " does not support "
+ statementType)
- .build()
+ .subject(STATEMENT_TYPE.getDisplayName())
+ .valid(false)
+ .explanation(databaseAdapter.getName() + " does not
support " + statementType)
+ .build()
);
}
@@ -495,15 +524,15 @@ public class PutDatabaseRecord extends AbstractProcessor {
}
if (autoCommit != null && autoCommit &&
!isMaxBatchSizeHardcodedToZero(validationContext)) {
- final String explanation = format("'%s' must be hard-coded to
zero when '%s' is set to 'true'."
- + " Batch size equal to zero executes all
statements in a single transaction"
- + " which allows rollback to revert all the
flow file's statements together if an error occurs.",
- MAX_BATCH_SIZE.getDisplayName(),
AUTO_COMMIT.getDisplayName());
-
- validationResults.add(new ValidationResult.Builder()
- .subject(MAX_BATCH_SIZE.getDisplayName())
- .explanation(explanation)
- .build());
+ final String explanation = format("'%s' must be hard-coded to zero
when '%s' is set to 'true'."
+ + " Batch size equal to zero executes all
statements in a single transaction"
+ + " which allows rollback to revert all the flow
file's statements together if an error occurs.",
+ MAX_BATCH_SIZE.getDisplayName(),
AUTO_COMMIT.getDisplayName());
+
+ validationResults.add(new ValidationResult.Builder()
+ .subject(MAX_BATCH_SIZE.getDisplayName())
+ .explanation(explanation)
+ .build());
}
return validationResults;
@@ -692,7 +721,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
final String regex = "(?<!\\\\);";
sqlStatements = (sql).split(regex);
} else {
- sqlStatements = new String[] {sql};
+ sqlStatements = new String[]{sql};
}
if (isFirstRecord) {
@@ -735,7 +764,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
private void executeDML(final ProcessContext context, final ProcessSession
session, final FlowFile flowFile,
final Connection con, final RecordReader
recordReader, final String explicitStatementType, final DMLSettings settings)
- throws IllegalArgumentException, MalformedRecordException,
IOException, SQLException {
+ throws IllegalArgumentException, MalformedRecordException,
IOException, SQLException {
final ComponentLog log = getLogger();
@@ -753,13 +782,16 @@ public class PutDatabaseRecord extends AbstractProcessor {
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException(format("Cannot process %s
because Table Name is null or empty", flowFile));
}
-
+ final NameNormalizer normalizer = Optional.of(settings)
+ .filter(s -> s.translateFieldNames)
+ .map(s ->
NameNormalizerFactory.getNormalizer(s.translationStrategy,
s.translationPattern))
+ .orElse(null);
final SchemaKey schemaKey = new PutDatabaseRecord.SchemaKey(catalog,
schemaName, tableName);
final TableSchema tableSchema;
try {
tableSchema = schemaCache.get(schemaKey, key -> {
try {
- final TableSchema schema = TableSchema.from(con, catalog,
schemaName, tableName, settings.translateFieldNames, updateKeys, log);
+ final TableSchema schema = TableSchema.from(con, catalog,
schemaName, tableName, settings.translateFieldNames, normalizer, updateKeys,
log);
getLogger().debug("Fetched Table Schema {} for table name
{}", schema, tableName);
return schema;
} catch (SQLException e) {
@@ -780,7 +812,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
}
// build the fully qualified table name
- final String fqTableName = generateTableName(settings, catalog,
schemaName, tableName, tableSchema);
+ final String fqTableName = generateTableName(settings, catalog,
schemaName, tableName, tableSchema);
final Map<String, PreparedSqlAndColumns> preparedSql = new HashMap<>();
int currentBatchSize = 0;
@@ -805,15 +837,15 @@ public class PutDatabaseRecord extends AbstractProcessor {
final SqlAndIncludedColumns sqlHolder;
if (INSERT_TYPE.equalsIgnoreCase(statementType)) {
- sqlHolder = generateInsert(recordSchema,
fqTableName, tableSchema, settings);
+ sqlHolder = generateInsert(recordSchema,
fqTableName, tableSchema, settings, normalizer);
} else if
(UPDATE_TYPE.equalsIgnoreCase(statementType)) {
- sqlHolder = generateUpdate(recordSchema,
fqTableName, updateKeys, tableSchema, settings);
+ sqlHolder = generateUpdate(recordSchema,
fqTableName, updateKeys, tableSchema, settings, normalizer);
} else if
(DELETE_TYPE.equalsIgnoreCase(statementType)) {
- sqlHolder = generateDelete(recordSchema,
fqTableName, deleteKeys, tableSchema, settings);
+ sqlHolder = generateDelete(recordSchema,
fqTableName, deleteKeys, tableSchema, settings, normalizer);
} else if
(UPSERT_TYPE.equalsIgnoreCase(statementType)) {
- sqlHolder = generateUpsert(recordSchema,
fqTableName, updateKeys, tableSchema, settings);
+ sqlHolder = generateUpsert(recordSchema,
fqTableName, updateKeys, tableSchema, settings, normalizer);
} else if
(INSERT_IGNORE_TYPE.equalsIgnoreCase(statementType)) {
- sqlHolder = generateInsertIgnore(recordSchema,
fqTableName, updateKeys, tableSchema, settings);
+ sqlHolder = generateInsertIgnore(recordSchema,
fqTableName, updateKeys, tableSchema, settings, normalizer);
} else {
throw new
IllegalArgumentException(format("Statement Type %s is not valid, FlowFile %s",
statementType, flowFile));
}
@@ -843,7 +875,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
if (ps != lastPreparedStatement && lastPreparedStatement
!= null) {
batchIndex++;
log.debug("Executing query {} because Statement Type
changed between Records for {}; fieldIndexes: {}; batch index: {}; batch size:
{}",
- sql, flowFile, fieldIndexes, batchIndex,
currentBatchSize);
+ sql, flowFile, fieldIndexes, batchIndex,
currentBatchSize);
lastPreparedStatement.executeBatch();
session.adjustCounter("Batches Executed", 1, false);
@@ -863,7 +895,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
final DataType dataType =
dataTypes.get(currentFieldIndex);
final int fieldSqlType =
DataTypeUtils.getSQLTypeValue(dataType);
final String fieldName =
recordSchema.getField(currentFieldIndex).getFieldName();
- String columnName =
ColumnDescription.normalizeColumnName(fieldName, settings.translateFieldNames);
+ String columnName =
TableSchema.normalizedName(fieldName, settings.translateFieldNames, normalizer);
int sqlType;
final ColumnDescription column =
columns.get(columnName);
@@ -952,7 +984,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
if (++currentBatchSize == maxBatchSize) {
batchIndex++;
log.debug("Executing query {} because batch reached
max size for {}; fieldIndexes: {}; batch index: {}; batch size: {}",
- sql, flowFile, fieldIndexes, batchIndex,
currentBatchSize);
+ sql, flowFile, fieldIndexes, batchIndex,
currentBatchSize);
session.adjustCounter("Batches Executed", 1, false);
ps.executeBatch();
currentBatchSize = 0;
@@ -1081,7 +1113,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
final RecordFieldType fieldType =
fieldValue.getField().getDataType().getFieldType();
if (fieldType != RecordFieldType.RECORD) {
throw new ProcessException("RecordPath " +
dataRecordPath.getPath() + " evaluated against Record expected to return one or
more Records but encountered field of type" +
- " " + fieldType);
+ " " + fieldType);
}
}
@@ -1185,18 +1217,18 @@ public class PutDatabaseRecord extends
AbstractProcessor {
return tableNameBuilder.toString();
}
- private Set<String> getNormalizedColumnNames(final RecordSchema schema,
final boolean translateFieldNames) {
+ private Set<String> getNormalizedColumnNames(final RecordSchema schema,
final boolean translateFieldNames, NameNormalizer normalizer) {
final Set<String> normalizedFieldNames = new HashSet<>();
if (schema != null) {
- schema.getFieldNames().forEach((fieldName) ->
normalizedFieldNames.add(ColumnDescription.normalizeColumnName(fieldName,
translateFieldNames)));
+ schema.getFieldNames().forEach((fieldName) ->
normalizedFieldNames.add(TableSchema.normalizedName(fieldName,
translateFieldNames, normalizer)));
}
return normalizedFieldNames;
}
- SqlAndIncludedColumns generateInsert(final RecordSchema recordSchema,
final String tableName, final TableSchema tableSchema, final DMLSettings
settings)
+ SqlAndIncludedColumns generateInsert(final RecordSchema recordSchema,
final String tableName, final TableSchema tableSchema, final DMLSettings
settings, NameNormalizer normalizer)
throws IllegalArgumentException, SQLException {
- checkValuesForRequiredColumns(recordSchema, tableSchema, settings);
+ checkValuesForRequiredColumns(recordSchema, tableSchema, settings,
normalizer);
final StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append("INSERT INTO ");
@@ -1214,7 +1246,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
RecordField field = recordSchema.getField(i);
String fieldName = field.getFieldName();
- final ColumnDescription desc =
tableSchema.getColumns().get(ColumnDescription.normalizeColumnName(fieldName,
settings.translateFieldNames));
+ final ColumnDescription desc =
tableSchema.getColumns().get(TableSchema.normalizedName(fieldName,
settings.translateFieldNames, normalizer));
if (desc == null && !settings.ignoreUnmappedFields) {
throw new SQLDataException("Cannot map field '" +
fieldName + "' to any column in the database\n"
+ (settings.translateFieldNames ? "Normalized " :
"") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
@@ -1254,13 +1286,13 @@ public class PutDatabaseRecord extends
AbstractProcessor {
}
SqlAndIncludedColumns generateUpsert(final RecordSchema recordSchema,
final String tableName, final String updateKeys,
- final TableSchema tableSchema, final
DMLSettings settings)
- throws IllegalArgumentException, SQLException,
MalformedRecordException {
+ final TableSchema tableSchema, final
DMLSettings settings, NameNormalizer normalizer)
+ throws IllegalArgumentException, SQLException,
MalformedRecordException {
- checkValuesForRequiredColumns(recordSchema, tableSchema, settings);
+ checkValuesForRequiredColumns(recordSchema, tableSchema, settings,
normalizer);
Set<String> keyColumnNames = getUpdateKeyColumnNames(tableName,
updateKeys, tableSchema);
- normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys,
settings, keyColumnNames);
+ normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys,
settings, keyColumnNames, normalizer);
List<String> usedColumnNames = new ArrayList<>();
List<Integer> usedColumnIndices = new ArrayList<>();
@@ -1273,7 +1305,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
RecordField field = recordSchema.getField(i);
String fieldName = field.getFieldName();
- final ColumnDescription desc =
tableSchema.getColumns().get(ColumnDescription.normalizeColumnName(fieldName,
settings.translateFieldNames));
+ final ColumnDescription desc =
tableSchema.getColumns().get(TableSchema.normalizedName(fieldName,
settings.translateFieldNames, normalizer));
if (desc == null && !settings.ignoreUnmappedFields) {
throw new SQLDataException("Cannot map field '" +
fieldName + "' to any column in the database\n"
+ (settings.translateFieldNames ? "Normalized " :
"") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
@@ -1308,13 +1340,13 @@ public class PutDatabaseRecord extends
AbstractProcessor {
}
SqlAndIncludedColumns generateInsertIgnore(final RecordSchema
recordSchema, final String tableName, final String updateKeys,
- final TableSchema tableSchema,
final DMLSettings settings)
+ final TableSchema tableSchema,
final DMLSettings settings, NameNormalizer normalizer)
throws IllegalArgumentException, SQLException,
MalformedRecordException {
- checkValuesForRequiredColumns(recordSchema, tableSchema, settings);
+ checkValuesForRequiredColumns(recordSchema, tableSchema, settings,
normalizer);
Set<String> keyColumnNames = getUpdateKeyColumnNames(tableName,
updateKeys, tableSchema);
- normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys,
settings, keyColumnNames);
+ normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys,
settings, keyColumnNames, normalizer);
List<String> usedColumnNames = new ArrayList<>();
List<Integer> usedColumnIndices = new ArrayList<>();
@@ -1327,7 +1359,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
RecordField field = recordSchema.getField(i);
String fieldName = field.getFieldName();
- final ColumnDescription desc =
tableSchema.getColumns().get(ColumnDescription.normalizeColumnName(fieldName,
settings.translateFieldNames));
+ final ColumnDescription desc =
tableSchema.getColumns().get(TableSchema.normalizedName(fieldName,
settings.translateFieldNames, normalizer));
if (desc == null && !settings.ignoreUnmappedFields) {
throw new SQLDataException("Cannot map field '" +
fieldName + "' to any column in the database\n"
+ (settings.translateFieldNames ? "Normalized " :
"") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
@@ -1363,11 +1395,11 @@ public class PutDatabaseRecord extends
AbstractProcessor {
}
SqlAndIncludedColumns generateUpdate(final RecordSchema recordSchema,
final String tableName, final String updateKeys,
- final TableSchema tableSchema, final
DMLSettings settings)
+ final TableSchema tableSchema, final
DMLSettings settings, NameNormalizer normalizer)
throws IllegalArgumentException, MalformedRecordException,
SQLException {
final Set<String> keyColumnNames = getUpdateKeyColumnNames(tableName,
updateKeys, tableSchema);
- final Set<String> normalizedKeyColumnNames =
normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, settings,
keyColumnNames);
+ final Set<String> normalizedKeyColumnNames =
normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, settings,
keyColumnNames, normalizer);
final StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append("UPDATE ");
@@ -1386,8 +1418,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
RecordField field = recordSchema.getField(i);
String fieldName = field.getFieldName();
- final String normalizedColName =
ColumnDescription.normalizeColumnName(fieldName, settings.translateFieldNames);
- final ColumnDescription desc =
tableSchema.getColumns().get(ColumnDescription.normalizeColumnName(fieldName,
settings.translateFieldNames));
+ final String normalizedColName =
TableSchema.normalizedName(fieldName, settings.translateFieldNames, normalizer);
+ final ColumnDescription desc =
tableSchema.getColumns().get(TableSchema.normalizedName(fieldName,
settings.translateFieldNames, normalizer));
if (desc == null) {
if (!settings.ignoreUnmappedFields) {
throw new SQLDataException("Cannot map field '" +
fieldName + "' to any column in the database\n"
@@ -1426,8 +1458,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
String fieldName = field.getFieldName();
boolean firstUpdateKey = true;
- final String normalizedColName =
ColumnDescription.normalizeColumnName(fieldName, settings.translateFieldNames);
- final ColumnDescription desc =
tableSchema.getColumns().get(ColumnDescription.normalizeColumnName(fieldName,
settings.translateFieldNames));
+ final String normalizedColName =
TableSchema.normalizedName(fieldName, settings.translateFieldNames, normalizer);
+ final ColumnDescription desc =
tableSchema.getColumns().get(TableSchema.normalizedName(fieldName,
settings.translateFieldNames, normalizer));
if (desc != null) {
// Check if this column is a Update Key. If so, add it to
the WHERE clause
@@ -1457,12 +1489,13 @@ public class PutDatabaseRecord extends
AbstractProcessor {
return new SqlAndIncludedColumns(sqlBuilder.toString(),
includedColumns);
}
- SqlAndIncludedColumns generateDelete(final RecordSchema recordSchema,
final String tableName, String deleteKeys, final TableSchema tableSchema, final
DMLSettings settings)
+ SqlAndIncludedColumns generateDelete(final RecordSchema recordSchema,
final String tableName, String deleteKeys, final TableSchema tableSchema,
+ final DMLSettings settings,
NameNormalizer normalizer)
throws IllegalArgumentException, MalformedRecordException,
SQLDataException {
- final Set<String> normalizedFieldNames =
getNormalizedColumnNames(recordSchema, settings.translateFieldNames);
+ final Set<String> normalizedFieldNames =
getNormalizedColumnNames(recordSchema, settings.translateFieldNames,
normalizer);
for (final String requiredColName :
tableSchema.getRequiredColumnNames()) {
- final String normalizedColName =
ColumnDescription.normalizeColumnName(requiredColName,
settings.translateFieldNames);
+ final String normalizedColName =
TableSchema.normalizedName(requiredColName, settings.translateFieldNames,
normalizer);
if (!normalizedFieldNames.contains(normalizedColName)) {
String missingColMessage = "Record does not have a value for
the Required column '" + requiredColName + "'";
if (settings.failUnmappedColumns) {
@@ -1505,7 +1538,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
RecordField field = recordSchema.getField(i);
String fieldName = field.getFieldName();
- final ColumnDescription desc =
tableSchema.getColumns().get(ColumnDescription.normalizeColumnName(fieldName,
settings.translateFieldNames));
+ final ColumnDescription desc =
tableSchema.getColumns().get(TableSchema.normalizedName(fieldName,
settings.translateFieldNames, normalizer));
if (desc == null && !settings.ignoreUnmappedFields) {
throw new SQLDataException("Cannot map field '" +
fieldName + "' to any column in the database\n"
+ (settings.translateFieldNames ? "Normalized " :
"") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
@@ -1554,11 +1587,11 @@ public class PutDatabaseRecord extends
AbstractProcessor {
return new SqlAndIncludedColumns(sqlBuilder.toString(),
includedColumns);
}
- private void checkValuesForRequiredColumns(RecordSchema recordSchema,
TableSchema tableSchema, DMLSettings settings) {
- final Set<String> normalizedFieldNames =
getNormalizedColumnNames(recordSchema, settings.translateFieldNames);
+ private void checkValuesForRequiredColumns(RecordSchema recordSchema,
TableSchema tableSchema, DMLSettings settings, NameNormalizer normalizer) {
+ final Set<String> normalizedFieldNames =
getNormalizedColumnNames(recordSchema, settings.translateFieldNames,
normalizer);
for (final String requiredColName :
tableSchema.getRequiredColumnNames()) {
- final String normalizedColName =
ColumnDescription.normalizeColumnName(requiredColName,
settings.translateFieldNames);
+ final String normalizedColName =
TableSchema.normalizedName(requiredColName, settings.translateFieldNames,
normalizer);
if (!normalizedFieldNames.contains(normalizedColName)) {
String missingColMessage = "Record does not have a value for
the Required column '" + requiredColName + "'";
if (settings.failUnmappedColumns) {
@@ -1590,15 +1623,15 @@ public class PutDatabaseRecord extends
AbstractProcessor {
return updateKeyColumnNames;
}
- private Set<String> normalizeKeyColumnNamesAndCheckForValues(RecordSchema
recordSchema, String updateKeys, DMLSettings settings, Set<String>
updateKeyColumnNames)
+ private Set<String> normalizeKeyColumnNamesAndCheckForValues(RecordSchema
recordSchema, String updateKeys, DMLSettings settings, Set<String>
updateKeyColumnNames, NameNormalizer normalizer)
throws MalformedRecordException {
// Create a Set of all normalized Update Key names, and ensure that
there is a field in the record
// for each of the Update Key fields.
- final Set<String> normalizedRecordFieldNames =
getNormalizedColumnNames(recordSchema, settings.translateFieldNames);
+ final Set<String> normalizedRecordFieldNames =
getNormalizedColumnNames(recordSchema, settings.translateFieldNames,
normalizer);
final Set<String> normalizedKeyColumnNames = new HashSet<>();
for (final String updateKeyColumnName : updateKeyColumnNames) {
- String normalizedKeyColumnName =
ColumnDescription.normalizeColumnName(updateKeyColumnName,
settings.translateFieldNames);
+ String normalizedKeyColumnName =
TableSchema.normalizedName(updateKeyColumnName, settings.translateFieldNames,
normalizer);
if (!normalizedRecordFieldNames.contains(normalizedKeyColumnName))
{
String missingColMessage = "Record does not have a value for
the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" +
updateKeyColumnName + "'";
@@ -1651,7 +1684,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
SchemaKey schemaKey = (SchemaKey) o;
if (catalog != null ? !catalog.equals(schemaKey.catalog) :
schemaKey.catalog != null) return false;
- if (schemaName != null ? !schemaName.equals(schemaKey.schemaName)
: schemaKey.schemaName != null) return false;
+ if (schemaName != null ? !schemaName.equals(schemaKey.schemaName)
: schemaKey.schemaName != null)
+ return false;
return tableName.equals(schemaKey.tableName);
}
}
@@ -1736,6 +1770,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
static class DMLSettings {
private final boolean translateFieldNames;
+ private final TranslationStrategy translationStrategy;
+ private final Pattern translationPattern;
private final boolean ignoreUnmappedFields;
// Is the unmatched column behaviour fail or warning?
@@ -1750,6 +1786,9 @@ public class PutDatabaseRecord extends AbstractProcessor {
DMLSettings(ProcessContext context) {
translateFieldNames =
context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean();
+ translationStrategy =
TranslationStrategy.valueOf(context.getProperty(TRANSLATION_STRATEGY).getValue());
+ final String translationRegex =
context.getProperty(TRANSLATION_PATTERN).getValue();
+ translationPattern = translationRegex == null ? null :
Pattern.compile(translationRegex);
ignoreUnmappedFields =
IGNORE_UNMATCHED_FIELD.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_FIELD_BEHAVIOR).getValue());
failUnmappedColumns =
FAIL_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateDatabaseTable.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateDatabaseTable.java
index 239ebede08..00b9d1ba52 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateDatabaseTable.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateDatabaseTable.java
@@ -40,8 +40,11 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.pattern.DiscontinuedException;
import org.apache.nifi.processors.standard.db.ColumnDescription;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+import org.apache.nifi.processors.standard.db.NameNormalizer;
+import org.apache.nifi.processors.standard.db.NameNormalizerFactory;
import org.apache.nifi.processors.standard.db.TableNotFoundException;
import org.apache.nifi.processors.standard.db.TableSchema;
+import org.apache.nifi.processors.standard.db.TranslationStrategy;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
@@ -71,6 +74,7 @@ import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
+import java.util.regex.Pattern;
import static
org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
@@ -177,6 +181,27 @@ public class UpdateDatabaseTable extends AbstractProcessor
{
.defaultValue("true")
.build();
+ public static final PropertyDescriptor TRANSLATION_STRATEGY = new
PropertyDescriptor.Builder()
+ .required(true)
+ .name("Column Name Translation Strategy")
+ .description("The strategy used to normalize table column name.
Column Name will be uppercased to " +
+ "do case-insensitive matching irrespective of strategy")
+ .allowableValues(TranslationStrategy.class)
+ .defaultValue(TranslationStrategy.REMOVE_UNDERSCORE.getValue())
+ .dependsOn(TRANSLATE_FIELD_NAMES,
TRANSLATE_FIELD_NAMES.getDefaultValue())
+ .build();
+
+ public static final PropertyDescriptor TRANSLATION_PATTERN = new
PropertyDescriptor.Builder()
+ .name("Column Name Translation Pattern")
+ .displayName("Column Name Translation Pattern")
+ .description("Column name will be normalized with this regular
expression")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+ .dependsOn(TRANSLATE_FIELD_NAMES,
TRANSLATE_FIELD_NAMES.getDefaultValue())
+ .dependsOn(TRANSLATION_STRATEGY,
TranslationStrategy.PATTERN.getValue())
+ .build();
+
static final PropertyDescriptor UPDATE_FIELD_NAMES = new
PropertyDescriptor.Builder()
.name("updatedatabasetable-update-field-names")
.displayName("Update Field Names")
@@ -282,6 +307,8 @@ public class UpdateDatabaseTable extends AbstractProcessor {
CREATE_TABLE,
PRIMARY_KEY_FIELDS,
TRANSLATE_FIELD_NAMES,
+ TRANSLATION_STRATEGY,
+ TRANSLATION_PATTERN,
UPDATE_FIELD_NAMES,
RECORD_WRITER_FACTORY,
QUOTE_TABLE_IDENTIFIER,
@@ -371,6 +398,13 @@ public class UpdateDatabaseTable extends AbstractProcessor
{
final boolean createIfNotExists =
context.getProperty(CREATE_TABLE).getValue().equals(CREATE_IF_NOT_EXISTS.getValue());
final boolean updateFieldNames =
context.getProperty(UPDATE_FIELD_NAMES).asBoolean();
final boolean translateFieldNames =
context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean();
+ final TranslationStrategy translationStrategy =
TranslationStrategy.valueOf(context.getProperty(TRANSLATION_STRATEGY).getValue());
+ final String translationRegex =
context.getProperty(TRANSLATION_PATTERN).getValue();
+ final Pattern translationPattern = translationRegex == null ? null
: Pattern.compile(translationRegex);
+ NameNormalizer normalizer = null;
+ if (translateFieldNames) {
+ normalizer =
NameNormalizerFactory.getNormalizer(translationStrategy, translationPattern);
+ }
if (recordWriterFactory == null && updateFieldNames) {
throw new ProcessException("Record Writer must be set if
'Update Field Names' is true");
}
@@ -393,7 +427,8 @@ public class UpdateDatabaseTable extends AbstractProcessor {
primaryKeyColumnNames = null;
}
final OutputMetadataHolder outputMetadataHolder =
checkAndUpdateTableSchema(connection, databaseAdapter, recordSchema,
- catalogName, schemaName, tableName, createIfNotExists,
translateFieldNames, updateFieldNames, primaryKeyColumnNames, quoteTableName,
quoteColumnNames);
+ catalogName, schemaName, tableName, createIfNotExists,
translateFieldNames, normalizer,
+ updateFieldNames, primaryKeyColumnNames,
quoteTableName, quoteColumnNames);
if (outputMetadataHolder != null) {
// The output schema changed (i.e. field names were
updated), so write out the corresponding FlowFile
try {
@@ -457,15 +492,16 @@ public class UpdateDatabaseTable extends
AbstractProcessor {
private synchronized OutputMetadataHolder checkAndUpdateTableSchema(final
Connection conn, final DatabaseAdapter databaseAdapter, final RecordSchema
schema,
final
String catalogName, final String schemaName, final String tableName,
- final
boolean createIfNotExists, final boolean translateFieldNames, final boolean
updateFieldNames,
- final
Set<String> primaryKeyColumnNames, final boolean quoteTableName, final boolean
quoteColumnNames) throws IOException {
+ final
boolean createIfNotExists, final boolean translateFieldNames, final
NameNormalizer normalizer,
+ final
boolean updateFieldNames, final Set<String> primaryKeyColumnNames, final
boolean quoteTableName,
+ final
boolean quoteColumnNames) throws IOException {
// Read in the current table metadata, compare it to the reader's
schema, and
// add any columns from the schema that are missing in the table
try (final Statement s = conn.createStatement()) {
// Determine whether the table exists
TableSchema tableSchema = null;
try {
- tableSchema = TableSchema.from(conn, catalogName, schemaName,
tableName, translateFieldNames, null, getLogger());
+ tableSchema = TableSchema.from(conn, catalogName, schemaName,
tableName, translateFieldNames, normalizer, null, getLogger());
} catch (TableNotFoundException tnfe) {
// Do nothing, the value will be populated if necessary
}
@@ -483,7 +519,7 @@ public class UpdateDatabaseTable extends AbstractProcessor {
getLogger().debug("Adding column {} to table {}",
recordFieldName, tableName);
}
- tableSchema = new TableSchema(catalogName, schemaName,
tableName, columns, translateFieldNames, primaryKeyColumnNames,
databaseAdapter.getColumnQuoteString());
+ tableSchema = new TableSchema(catalogName, schemaName,
tableName, columns, translateFieldNames, normalizer, primaryKeyColumnNames,
databaseAdapter.getColumnQuoteString());
final String createTableSql =
databaseAdapter.getCreateTableStatement(tableSchema, quoteTableName,
quoteColumnNames);
@@ -502,7 +538,7 @@ public class UpdateDatabaseTable extends AbstractProcessor {
final List<String> dbColumns = new ArrayList<>();
for (final ColumnDescription columnDescription :
tableSchema.getColumnsAsList()) {
-
dbColumns.add(ColumnDescription.normalizeColumnName(columnDescription.getColumnName(),
translateFieldNames));
+
dbColumns.add(TableSchema.normalizedName(columnDescription.getColumnName(),
translateFieldNames, normalizer));
}
final List<ColumnDescription> columnsToAdd = new ArrayList<>();
@@ -511,7 +547,7 @@ public class UpdateDatabaseTable extends AbstractProcessor {
// Handle new columns
for (RecordField recordField : schema.getFields()) {
final String recordFieldName = recordField.getFieldName();
- final String normalizedFieldName =
ColumnDescription.normalizeColumnName(recordFieldName, translateFieldNames);
+ final String normalizedFieldName =
TableSchema.normalizedName(recordFieldName, translateFieldNames, normalizer);
if (!dbColumns.contains(normalizedFieldName)) {
// The field does not exist in the table, add it
ColumnDescription columnToAdd = new
ColumnDescription(recordFieldName,
DataTypeUtils.getSQLTypeValue(recordField.getDataType()),
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/NameNormalizer.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/NameNormalizer.java
new file mode 100644
index 0000000000..42c41fab6d
--- /dev/null
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/NameNormalizer.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.db;
+
+public interface NameNormalizer {
+
+ String getNormalizedName(String colName);
+
+}
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/NameNormalizerFactory.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/NameNormalizerFactory.java
new file mode 100644
index 0000000000..1e84c34375
--- /dev/null
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/NameNormalizerFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.db;
+
+import org.apache.nifi.processors.standard.db.impl.PatternNormalizer;
+import
org.apache.nifi.processors.standard.db.impl.RemoveAllSpecialCharNormalizer;
+import org.apache.nifi.processors.standard.db.impl.RemoveSpaceNormalizer;
+import
org.apache.nifi.processors.standard.db.impl.RemoveUnderscoreAndSpaceNormalizer;
+import org.apache.nifi.processors.standard.db.impl.RemoveUnderscoreNormalizer;
+
+import java.util.regex.Pattern;
+
+public class NameNormalizerFactory {
+ public static NameNormalizer getNormalizer(TranslationStrategy strategy,
Pattern regex) {
+
+ return switch (strategy) {
+ case REMOVE_UNDERSCORE -> new RemoveUnderscoreNormalizer();
+ case REMOVE_SPACE -> new RemoveSpaceNormalizer();
+ case REMOVE_UNDERSCORE_AND_SPACE -> new
RemoveUnderscoreAndSpaceNormalizer();
+ case REMOVE_ALL_SPECIAL_CHAR -> new
RemoveAllSpecialCharNormalizer();
+ case PATTERN -> new PatternNormalizer(regex);
+ };
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TableSchema.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TableSchema.java
index 88e81e16be..dc7d3f7db2 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TableSchema.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TableSchema.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+
public class TableSchema {
private final List<String> requiredColumnNames;
private final Set<String> primaryKeyColumnNames;
@@ -38,18 +39,20 @@ public class TableSchema {
private final String schemaName;
private final String tableName;
- public TableSchema(final String catalogName, final String schemaName,
final String tableName, final List<ColumnDescription> columnDescriptions, final
boolean translateColumnNames,
- final Set<String> primaryKeyColumnNames, final String
quotedIdentifierString) {
+ public TableSchema(final String catalogName, final String schemaName,
final String tableName,
+ final List<ColumnDescription> columnDescriptions, final
boolean translateColumnNames,
+ final NameNormalizer normalizer,
+ final Set<String> primaryKeyColumnNames, final String
quotedIdentifierString) {
this.catalogName = catalogName;
this.schemaName = schemaName;
this.tableName = tableName;
this.columns = new LinkedHashMap<>();
this.primaryKeyColumnNames = primaryKeyColumnNames;
this.quotedIdentifierString = quotedIdentifierString;
-
this.requiredColumnNames = new ArrayList<>();
for (final ColumnDescription desc : columnDescriptions) {
-
columns.put(ColumnDescription.normalizeColumnName(desc.getColumnName(),
translateColumnNames), desc);
+ final String colName = normalizedName(desc.getColumnName(),
translateColumnNames, normalizer);
+ columns.put(colName, desc);
if (desc.isRequired()) {
requiredColumnNames.add(desc.getColumnName());
}
@@ -89,7 +92,8 @@ public class TableSchema {
}
public static TableSchema from(final Connection conn, final String
catalog, final String schema, final String tableName,
- final boolean translateColumnNames, final
String updateKeys, ComponentLog log) throws SQLException {
+ final boolean translateColumnNames, final
NameNormalizer normalizer,
+ final String updateKeys, ComponentLog log)
throws SQLException {
final DatabaseMetaData dmd = conn.getMetaData();
try (final ResultSet colrs = dmd.getColumns(catalog, schema,
tableName, "%")) {
@@ -136,12 +140,21 @@ public class TableSchema {
} else {
// Parse the Update Keys field and normalize the column names
for (final String updateKey : updateKeys.split(",")) {
-
primaryKeyColumns.add(ColumnDescription.normalizeColumnName(updateKey.trim(),
translateColumnNames));
+ final String colName = normalizedName(updateKey,
translateColumnNames, normalizer);
+ primaryKeyColumns.add(colName);
+
}
}
- return new TableSchema(catalog, schema, tableName, cols,
translateColumnNames, primaryKeyColumns, dmd.getIdentifierQuoteString());
+ return new TableSchema(catalog, schema, tableName, cols,
translateColumnNames, normalizer, primaryKeyColumns,
dmd.getIdentifierQuoteString());
+ }
+ }
+
+ public static String normalizedName(final String name, final boolean
translateNames, final NameNormalizer normalizer) {
+ if (translateNames && normalizer != null) {
+ return normalizer.getNormalizedName(name).trim().toUpperCase();
}
+ return name;
}
@Override
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TranslationStrategy.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TranslationStrategy.java
new file mode 100644
index 0000000000..7e3470a78a
--- /dev/null
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TranslationStrategy.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.db;
+
+import org.apache.nifi.components.DescribedValue;
+
+/**
+ * Enumeration of supported Database column name Translation Strategies
+ */
+public enum TranslationStrategy implements DescribedValue {
+ REMOVE_UNDERSCORE("Remove Underscore",
+ "Underscores '_' will be removed from column names Ex: 'Pics_1_23'
becomes 'PICS123'"),
+ REMOVE_SPACE("Remove Space",
+ "Spaces will be removed from column names Ex. 'User Name' becomes
'USERNAME'"),
+ REMOVE_UNDERSCORE_AND_SPACE("Remove Underscores and Spaces",
+ "Spaces and Underscores will be removed from column names Ex.
'User_1 Name' becomes 'USER1NAME'"),
+ REMOVE_ALL_SPECIAL_CHAR("Remove Regular Expression Characters",
+ "Remove Regular Expression Characters " +
+ "Ex. 'user-id' becomes USERID ,total(estimated) become
TOTALESTIMATED"),
+ PATTERN("Regular Expression",
+ "Remove characters matching this Regular Expression from the
column names Ex." +
+ "1. '\\d' will Remove all numbers " +
+ "2. '[^a-zA-Z0-9_]' will remove special characters except
underscore");
+ private final String displayName;
+ private final String description;
+
+ TranslationStrategy(String displayName, String description) {
+ this.displayName = displayName;
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return name();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PatternNormalizer.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PatternNormalizer.java
new file mode 100644
index 0000000000..72f01dee5d
--- /dev/null
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PatternNormalizer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.db.impl;
+
+import org.apache.nifi.processors.standard.db.NameNormalizer;
+
+import java.util.regex.Pattern;
+
+public class PatternNormalizer implements NameNormalizer {
+ private final Pattern pattern;
+
+ public PatternNormalizer(Pattern pattern) {
+ this.pattern = pattern;
+ }
+
+ @Override
+ public String getNormalizedName(String colName) {
+
+ if (pattern == null) {
+ return colName;
+ } else {
+ return pattern.matcher(colName).replaceAll("");
+ }
+ }
+
+}
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveAllSpecialCharNormalizer.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveAllSpecialCharNormalizer.java
new file mode 100644
index 0000000000..3d9d37700d
--- /dev/null
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveAllSpecialCharNormalizer.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.db.impl;
+
+import org.apache.nifi.processors.standard.db.NameNormalizer;
+
+import java.util.regex.Pattern;
+
+public class RemoveAllSpecialCharNormalizer implements NameNormalizer {
+ private static final Pattern REMOVE_ALL_SPECIAL_CHAR_REGEX =
Pattern.compile("[^a-zA-Z0-9]");
+ @Override
+ public String getNormalizedName(String colName) {
+ return REMOVE_ALL_SPECIAL_CHAR_REGEX.matcher(colName).replaceAll("");
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveSpaceNormalizer.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveSpaceNormalizer.java
new file mode 100644
index 0000000000..9f20b7ee8d
--- /dev/null
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveSpaceNormalizer.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.db.impl;
+
+import org.apache.nifi.processors.standard.db.NameNormalizer;
+
+public class RemoveSpaceNormalizer implements NameNormalizer {
+
+ @Override
+ public String getNormalizedName(String colName) {
+ return colName.replace(" ", "");
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveUnderscoreAndSpaceNormalizer.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveUnderscoreAndSpaceNormalizer.java
new file mode 100644
index 0000000000..454684c07f
--- /dev/null
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveUnderscoreAndSpaceNormalizer.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.db.impl;
+
+import org.apache.nifi.processors.standard.db.NameNormalizer;
+
+public class RemoveUnderscoreAndSpaceNormalizer implements NameNormalizer {
+
+
+ @Override
+ public String getNormalizedName(String colName) {
+ return colName.replace("_", "").replace(" ", "");
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveUnderscoreNormalizer.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveUnderscoreNormalizer.java
new file mode 100644
index 0000000000..410007ce2d
--- /dev/null
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/RemoveUnderscoreNormalizer.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.db.impl;
+
+import org.apache.nifi.processors.standard.db.NameNormalizer;
+
+public class RemoveUnderscoreNormalizer implements NameNormalizer {
+
+
+ @Override
+ public String getNormalizedName(String colName) {
+ return colName.replace("_", "");
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
index 307c230ced..6b85cc3f74 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
@@ -22,6 +22,7 @@ import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
import org.apache.nifi.processors.standard.db.ColumnDescription;
+import org.apache.nifi.processors.standard.db.NameNormalizer;
import org.apache.nifi.processors.standard.db.TableSchema;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.MalformedRecordException;
@@ -388,7 +389,7 @@ public class PutDatabaseRecordTest {
new ColumnDescription("name", 12, true, 255, true),
new ColumnDescription("code", 4, true, 10, true)
),
- false,
+ false, null,
new HashSet<>(Arrays.asList("id")),
""
);
@@ -401,13 +402,13 @@ public class PutDatabaseRecordTest {
final PutDatabaseRecord.DMLSettings settings = new
PutDatabaseRecord.DMLSettings(runner.getProcessContext());
assertEquals("INSERT INTO PERSONS (id, name, code) VALUES (?,?,?)",
- processor.generateInsert(schema, "PERSONS", tableSchema,
settings).getSql());
+ processor.generateInsert(schema, "PERSONS", tableSchema,
settings, null).getSql());
assertEquals("UPDATE PERSONS SET name = ?, code = ? WHERE id = ?",
- processor.generateUpdate(schema, "PERSONS", null, tableSchema,
settings).getSql());
+ processor.generateUpdate(schema, "PERSONS", null, tableSchema,
settings, null).getSql());
assertEquals("DELETE FROM PERSONS WHERE (id = ?) AND (name = ? OR
(name is null AND ? is null)) AND (code = ? OR (code is null AND ? is null))",
- processor.generateDelete(schema, "PERSONS", null, tableSchema,
settings).getSql());
+ processor.generateDelete(schema, "PERSONS", null, tableSchema,
settings, null).getSql());
assertEquals("DELETE FROM PERSONS WHERE (id = ?) AND (code = ? OR
(code is null AND ? is null))",
- processor.generateDelete(schema, "PERSONS", "id, code",
tableSchema, settings).getSql());
+ processor.generateDelete(schema, "PERSONS", "id, code",
tableSchema, settings, null).getSql());
}
@Test
@@ -429,7 +430,7 @@ public class PutDatabaseRecordTest {
new ColumnDescription("name", 12, true, 255, true),
new ColumnDescription("code", 4, true, 10, true)
),
- false,
+ false, null,
new HashSet<>(Arrays.asList("id")),
""
);
@@ -442,17 +443,17 @@ public class PutDatabaseRecordTest {
final PutDatabaseRecord.DMLSettings settings = new
PutDatabaseRecord.DMLSettings(runner.getProcessContext());
SQLDataException e = assertThrows(SQLDataException.class,
- () -> processor.generateInsert(schema, "PERSONS", tableSchema,
settings),
+ () -> processor.generateInsert(schema, "PERSONS", tableSchema,
settings, null),
"generateInsert should fail with unmatched fields");
assertEquals("Cannot map field 'non_existing' to any column in the
database\nColumns: id,name,code", e.getMessage());
e = assertThrows(SQLDataException.class,
- () -> processor.generateUpdate(schema, "PERSONS", null,
tableSchema, settings),
+ () -> processor.generateUpdate(schema, "PERSONS", null,
tableSchema, settings, null),
"generateUpdate should fail with unmatched fields");
assertEquals("Cannot map field 'non_existing' to any column in the
database\nColumns: id,name,code", e.getMessage());
e = assertThrows(SQLDataException.class,
- () -> processor.generateDelete(schema, "PERSONS", null,
tableSchema, settings),
+ () -> processor.generateDelete(schema, "PERSONS", null,
tableSchema, settings, null),
"generateDelete should fail with unmatched fields");
assertEquals("Cannot map field 'non_existing' to any column in the
database\nColumns: id,name,code", e.getMessage());
}
@@ -725,7 +726,7 @@ public class PutDatabaseRecordTest {
public void testInsertViaSqlTypeOneStatement(TestCase testCase) throws
InitializationException, ProcessException, SQLException {
setRunner(testCase);
- String[] sqlStatements = new String[] {
+ String[] sqlStatements = new String[]{
"INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)"
};
testInsertViaSqlTypeStatements(sqlStatements, false);
@@ -736,7 +737,7 @@ public class PutDatabaseRecordTest {
public void testInsertViaSqlTypeTwoStatementsSemicolon(TestCase testCase)
throws InitializationException, ProcessException, SQLException {
setRunner(testCase);
- String[] sqlStatements = new String[] {
+ String[] sqlStatements = new String[]{
"INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)",
"INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102);"
};
@@ -748,7 +749,7 @@ public class PutDatabaseRecordTest {
public void testInsertViaSqlTypeThreeStatements(TestCase testCase) throws
InitializationException, ProcessException, SQLException {
setRunner(testCase);
- String[] sqlStatements = new String[] {
+ String[] sqlStatements = new String[]{
"INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)",
"INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102)",
"UPDATE PERSONS SET code = 101 WHERE id = 1"
@@ -824,7 +825,7 @@ public class PutDatabaseRecordTest {
public void testMultipleInsertsForOneStatementViaSqlStatementType(TestCase
testCase) throws InitializationException, ProcessException, SQLException {
setRunner(testCase);
- String[] sqlStatements = new String[] {
+ String[] sqlStatements = new String[]{
"INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)"
};
testMultipleStatementsViaSqlStatementType(sqlStatements);
@@ -835,7 +836,7 @@ public class PutDatabaseRecordTest {
public void
testMultipleInsertsForTwoStatementsViaSqlStatementType(TestCase testCase)
throws InitializationException, ProcessException, SQLException {
setRunner(testCase);
- String[] sqlStatements = new String[] {
+ String[] sqlStatements = new String[]{
"INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)",
"INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102);"
};
@@ -1731,7 +1732,7 @@ public class PutDatabaseRecordTest {
new ColumnDescription("name", 12, true, 255, true),
new ColumnDescription("code", 4, true, 10, true)
),
- false,
+ false, null,
new HashSet<>(Arrays.asList("id")),
""
);
@@ -2035,7 +2036,7 @@ public class PutDatabaseRecordTest {
assertEquals(1, resultSet.getInt(1));
Blob blob = resultSet.getBlob(2);
- assertArrayEquals(new byte[] {(byte) 171, (byte) 205, (byte) 239},
blob.getBytes(1, (int) blob.length()));
+ assertArrayEquals(new byte[]{(byte) 171, (byte) 205, (byte) 239},
blob.getBytes(1, (int) blob.length()));
stmt.close();
conn.close();
@@ -2201,7 +2202,7 @@ public class PutDatabaseRecordTest {
parser.addSchemaField("code", RecordFieldType.INT);
parser.addSchemaField("content",
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType()).getFieldType());
- parser.addRecord(1, "rec1", 101, new Integer[] {1, 2, 3});
+ parser.addRecord(1, "rec1", 101, new Integer[]{1, 2, 3});
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE,
PutDatabaseRecord.INSERT_TYPE);
@@ -2320,8 +2321,8 @@ public class PutDatabaseRecordTest {
parser.addSchemaField("id", RecordFieldType.INT);
parser.addSchemaField("name",
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()).getFieldType());
- byte[] longVarBinaryValue1 = new byte[] {97, 98, 99};
- byte[] longVarBinaryValue2 = new byte[] {100, 101, 102};
+ byte[] longVarBinaryValue1 = new byte[]{97, 98, 99};
+ byte[] longVarBinaryValue2 = new byte[]{100, 101, 102};
parser.addRecord(1, longVarBinaryValue1);
parser.addRecord(2, longVarBinaryValue2);
@@ -2421,7 +2422,7 @@ public class PutDatabaseRecordTest {
static class PutDatabaseRecordUnmatchedField extends PutDatabaseRecord {
@Override
- SqlAndIncludedColumns generateInsert(RecordSchema recordSchema, String
tableName, TableSchema tableSchema, DMLSettings settings) throws
IllegalArgumentException {
+ SqlAndIncludedColumns generateInsert(RecordSchema recordSchema, String
tableName, TableSchema tableSchema, DMLSettings settings, NameNormalizer
normalizer) throws IllegalArgumentException {
return new SqlAndIncludedColumns("INSERT INTO PERSONS VALUES
(?,?,?,?)", Arrays.asList(0, 1, 2, 3));
}
}
@@ -2459,7 +2460,7 @@ public class PutDatabaseRecordTest {
@Override
public Connection getConnection() throws ProcessException {
try {
- Connection spyConnection =
spy(DriverManager.getConnection("jdbc:derby:" + databaseLocation +
";create=true"));
+ Connection spyConnection =
spy(DriverManager.getConnection("jdbc:derby:" + databaseLocation +
";create=true"));
doThrow(SQLFeatureNotSupportedException.class).when(spyConnection).setAutoCommit(false);
return spyConnection;
} catch (final Exception e) {
@@ -2474,6 +2475,7 @@ public class PutDatabaseRecordTest {
this.rollbackOnFailure = rollbackOnFailure;
this.batchSize = batchSize;
}
+
private Boolean autoCommit = null;
private Boolean rollbackOnFailure = null;
private Integer batchSize = null;
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java
index 88052b2673..1409f253a0 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java
@@ -23,8 +23,11 @@ import java.util.Collections;
import java.util.List;
import org.apache.nifi.processors.standard.db.ColumnDescription;
+import org.apache.nifi.processors.standard.db.NameNormalizer;
+import org.apache.nifi.processors.standard.db.NameNormalizerFactory;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.apache.nifi.processors.standard.db.TableSchema;
+import org.apache.nifi.processors.standard.db.TranslationStrategy;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -144,14 +147,15 @@ public class TestOracle12DatabaseAdapter {
Collection<String> uniqueKeyColumnNames = Arrays.asList("column1",
"column4");
String expected = "MERGE INTO table USING (SELECT ? column1, ?
column2, ? column3, ? column_4 FROM DUAL) n" +
- " ON (table.column1 = n.column1 AND table.column_4 = n.column_4) WHEN
NOT MATCHED THEN" +
- " INSERT (column1, column2, column3, column_4) VALUES (n.column1,
n.column2, n.column3, n.column_4)" +
- " WHEN MATCHED THEN UPDATE SET table.column2 = n.column2,
table.column3 = n.column3";
+ " ON (table.column1 = n.column1 AND table.column_4 =
n.column_4) WHEN NOT MATCHED THEN" +
+ " INSERT (column1, column2, column3, column_4) VALUES
(n.column1, n.column2, n.column3, n.column_4)" +
+ " WHEN MATCHED THEN UPDATE SET table.column2 = n.column2,
table.column3 = n.column3";
// WHEN
// THEN
testGetUpsertStatement(tableName, columnNames, uniqueKeyColumnNames,
expected);
}
+
@Test
public void testGetCreateTableStatement() {
assertTrue(db.supportsCreateTableIfNotExists());
@@ -159,7 +163,9 @@ public class TestOracle12DatabaseAdapter {
new ColumnDescription("col1", Types.INTEGER, true, 4, false),
new ColumnDescription("col2", Types.VARCHAR, false, 2000, true)
);
- TableSchema tableSchema = new TableSchema("USERS", null, "TEST_TABLE",
columns, true, Collections.singleton("COL1"), db.getColumnQuoteString());
+ NameNormalizer normalizer =
NameNormalizerFactory.getNormalizer(TranslationStrategy.REMOVE_UNDERSCORE,
null);
+ TableSchema tableSchema = new TableSchema("USERS", null, "TEST_TABLE",
columns,
+ true, normalizer, Collections.singleton("COL1"),
db.getColumnQuoteString());
String expectedStatement = "DECLARE\n\tsql_stmt
long;\nBEGIN\n\tsql_stmt:='CREATE TABLE "
// Strings are returned as VARCHAR2(2000) regardless of
reported size and that VARCHAR2 is not in java.sql.Types
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracleDatabaseAdapter.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracleDatabaseAdapter.java
index 2e9f6b6171..99b8e3e3bd 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracleDatabaseAdapter.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracleDatabaseAdapter.java
@@ -17,8 +17,11 @@
package org.apache.nifi.processors.standard.db.impl;
import org.apache.nifi.processors.standard.db.ColumnDescription;
+import org.apache.nifi.processors.standard.db.NameNormalizer;
+import org.apache.nifi.processors.standard.db.NameNormalizerFactory;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.apache.nifi.processors.standard.db.TableSchema;
+import org.apache.nifi.processors.standard.db.TranslationStrategy;
import org.junit.jupiter.api.Test;
import java.sql.Types;
@@ -118,7 +121,9 @@ public class TestOracleDatabaseAdapter {
new ColumnDescription("col1", Types.INTEGER, true, 4, false),
new ColumnDescription("col2", Types.VARCHAR, false, 2000, true)
);
- TableSchema tableSchema = new TableSchema("USERS", null, "TEST_TABLE",
columns, true, Collections.singleton("COL1"), db.getColumnQuoteString());
+ NameNormalizer normalizer =
NameNormalizerFactory.getNormalizer(TranslationStrategy.REMOVE_UNDERSCORE,
null);
+ TableSchema tableSchema = new TableSchema("USERS", null, "TEST_TABLE",
columns,
+ true, normalizer, Collections.singleton("COL1"),
db.getColumnQuoteString());
String expectedStatement = "DECLARE\n\tsql_stmt
long;\nBEGIN\n\tsql_stmt:='CREATE TABLE "
// Strings are returned as VARCHAR2(2000) regardless of
reported size and that VARCHAR2 is not in java.sql.Types