This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new b9076ca NIFI-7989: Add Update Field Names and Record Writer to
UpdateHiveTable processors
b9076ca is described below
commit b9076ca26eb444ebda28ef6c5efbb759e4d1af0f
Author: Matthew Burgess <[email protected]>
AuthorDate: Thu Jan 7 18:02:39 2021 -0500
NIFI-7989: Add Update Field Names and Record Writer to UpdateHiveTable
processors
NIFI-7989: Only rewrite records if a field name doesn't match a table
column name exactly
NIFI-7989: Rewrite records for created tables if Update Field Names is set
This closes #4750.
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../nifi/processors/hive/UpdateHiveTable.java | 220 +++++++++++++++++++-
.../nifi/processors/hive/UpdateHive3Table.java | 222 +++++++++++++++++++-
.../nifi/processors/hive/TestUpdateHive3Table.java | 100 +++++++++
.../src/test/resources/user_mixedcase.avsc | 26 +++
.../nifi/processors/hive/UpdateHive_1_1Table.java | 223 ++++++++++++++++++++-
5 files changed, 761 insertions(+), 30 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/UpdateHiveTable.java
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/UpdateHiveTable.java
index aef713d..89e0ced 100644
---
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/UpdateHiveTable.java
+++
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/UpdateHiveTable.java
@@ -27,10 +27,13 @@ import
org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.dbcp.hive.HiveDBCPService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@@ -41,8 +44,15 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.pattern.DiscontinuedException;
import
org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
+import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.StringUtils;
@@ -55,9 +65,12 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@@ -73,7 +86,10 @@ import java.util.stream.Collectors;
@WritesAttribute(attribute = "output.table", description = "This
attribute is written on the flow files routed to the 'success' "
+ "and 'failure' relationships, and contains the target table
name."),
@WritesAttribute(attribute = "output.path", description = "This
attribute is written on the flow files routed to the 'success' "
- + "and 'failure' relationships, and contains the path on the
file system to the table (or partition location if the table is partitioned).")
+ + "and 'failure' relationships, and contains the path on the
file system to the table (or partition location if the table is partitioned)."),
+ @WritesAttribute(attribute = "mime.type", description = "Sets the
mime.type attribute to the MIME Type specified by the Record Writer, only if a
Record Writer is specified "
+ + "and Update Field Names is 'true'."),
+ @WritesAttribute(attribute = "record.count", description = "Sets the
number of records in the FlowFile, only if a Record Writer is specified and
Update Field Names is 'true'.")
})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@RequiresInstanceClassLoading
@@ -163,6 +179,31 @@ public class UpdateHiveTable extends AbstractProcessor {
.dependsOn(CREATE_TABLE, CREATE_IF_NOT_EXISTS)
.build();
+ static final PropertyDescriptor UPDATE_FIELD_NAMES = new
PropertyDescriptor.Builder()
+ .name("hive-update-field-names")
+ .displayName("Update Field Names")
+ .description("This property indicates whether to update the output
schema such that the field names are set to the exact column names from the
specified "
+ + "table. This should be used if the incoming record field
names may not match the table's column names in terms of upper- and lower-case.
For example, this property should be "
+ + "set to true if the output FlowFile (and target table
storage) is Avro format, as Hive/Impala expects the field names to match the
column names exactly.")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor RECORD_WRITER_FACTORY = new
PropertyDescriptor.Builder()
+ .name("hive-record-writer")
+ .displayName("Record Writer")
+ .description("Specifies the Controller Service to use for writing
results to a FlowFile. The Record Writer should use Inherit Schema to emulate
the inferred schema behavior, i.e. "
+ + "an explicit schema need not be defined in the writer,
and will be supplied by the same logic used to infer the schema from the column
types. If Create Table Strategy is set "
+ + "'Create If Not Exists', the Record Writer's output
format must match the Record Reader's format in order for the data to be placed
in the created table location. Note that "
+ + "this property is only used if 'Update Field Names' is
set to true and the field names do not all match the column names exactly. If
no "
+ + "update is needed for any field names (or 'Update Field
Names' is false), the Record Writer is not used and instead the input FlowFile
is routed to success or failure "
+ + "without modification.")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .dependsOn(UPDATE_FIELD_NAMES, "true")
+ .required(true)
+ .build();
+
static final PropertyDescriptor EXTERNAL_TABLE_LOCATION = new
PropertyDescriptor.Builder()
.name("hive-external-table-location")
.displayName("External Table Location")
@@ -237,6 +278,8 @@ public class UpdateHiveTable extends AbstractProcessor {
props.add(TABLE_MANAGEMENT_STRATEGY);
props.add(EXTERNAL_TABLE_LOCATION);
props.add(TABLE_STORAGE_FORMAT);
+ props.add(UPDATE_FIELD_NAMES);
+ props.add(RECORD_WRITER_FACTORY);
props.add(QUERY_TIMEOUT);
propertyDescriptors = Collections.unmodifiableList(props);
@@ -258,6 +301,30 @@ public class UpdateHiveTable extends AbstractProcessor {
}
@Override
+ protected Collection<ValidationResult> customValidate(ValidationContext
validationContext) {
+ List<ValidationResult> validationResults = new
ArrayList<>(super.customValidate(validationContext));
+ final boolean recordWriterFactorySet =
validationContext.getProperty(RECORD_WRITER_FACTORY).isSet();
+ final boolean createIfNotExists =
validationContext.getProperty(CREATE_TABLE).getValue().equals(CREATE_IF_NOT_EXISTS.getValue());
+ final boolean updateFieldNames =
validationContext.getProperty(UPDATE_FIELD_NAMES).asBoolean();
+
+ if (!recordWriterFactorySet && updateFieldNames) {
+ validationResults.add(new
ValidationResult.Builder().subject(RECORD_WRITER_FACTORY.getDisplayName())
+ .explanation("Record Writer must be set if 'Update Field
Names' is true").valid(false).build());
+ }
+ final String tableManagementStrategy =
validationContext.getProperty(TABLE_MANAGEMENT_STRATEGY).getValue();
+ final boolean managedTable;
+ if
(!ATTRIBUTE_DRIVEN_TABLE.getValue().equals(tableManagementStrategy)) {
+ managedTable =
MANAGED_TABLE.getValue().equals(tableManagementStrategy);
+ // Ensure valid configuration for external tables
+ if (createIfNotExists && !managedTable &&
!validationContext.getProperty(EXTERNAL_TABLE_LOCATION).isSet()) {
+ validationResults.add(new
ValidationResult.Builder().subject(EXTERNAL_TABLE_LOCATION.getDisplayName())
+ .explanation("External Table Location must be set when
Table Management Strategy is set to External").valid(false).build());
+ }
+ }
+ return validationResults;
+ }
+
+ @Override
public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
FlowFile flowFile = session.get();
@@ -266,6 +333,7 @@ public class UpdateHiveTable extends AbstractProcessor {
}
final RecordReaderFactory recordReaderFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ final RecordSetWriterFactory recordWriterFactory =
context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
final String tableName =
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String partitionClauseString =
context.getProperty(PARTITION_CLAUSE).evaluateAttributeExpressions(flowFile).getValue();
List<String> partitionClauseElements = null;
@@ -292,6 +360,15 @@ public class UpdateHiveTable extends AbstractProcessor {
new Object[]{RecordReader.class.getSimpleName(),
flowFile},
rrfe
);
+ // Since we are wrapping the exceptions above there should
always be a cause
+ // but it's possible it might not have a message. This handles
that by logging
+ // the name of the class thrown.
+ Throwable c = rrfe.getCause();
+ if (c != null) {
+ session.putAttribute(flowFile, "record.error.message",
(c.getLocalizedMessage() != null) ? c.getLocalizedMessage() :
c.getClass().getCanonicalName() + " Thrown");
+ } else {
+ session.putAttribute(flowFile, "record.error.message",
rrfe.getClass().getCanonicalName() + " Thrown");
+ }
session.transfer(flowFile, REL_FAILURE);
return;
}
@@ -299,6 +376,10 @@ public class UpdateHiveTable extends AbstractProcessor {
RecordSchema recordSchema = reader.getSchema();
final boolean createIfNotExists =
context.getProperty(CREATE_TABLE).getValue().equals(CREATE_IF_NOT_EXISTS.getValue());
+ final boolean updateFieldNames =
context.getProperty(UPDATE_FIELD_NAMES).asBoolean();
+ if (recordWriterFactory == null && updateFieldNames) {
+ throw new ProcessException("Record Writer must be set if
'Update Field Names' is true");
+ }
final String tableManagementStrategy =
context.getProperty(TABLE_MANAGEMENT_STRATEGY).getValue();
final boolean managedTable;
if
(ATTRIBUTE_DRIVEN_TABLE.getValue().equals(tableManagementStrategy)) {
@@ -331,8 +412,54 @@ public class UpdateHiveTable extends AbstractProcessor {
final String storageFormat =
context.getProperty(TABLE_STORAGE_FORMAT).getValue();
final HiveDBCPService dbcpService =
context.getProperty(HIVE_DBCP_SERVICE).asControllerService(HiveDBCPService.class);
try (final Connection connection = dbcpService.getConnection()) {
- checkAndUpdateTableSchema(session, flowFile, connection,
recordSchema, tableName, partitionClauseElements, createIfNotExists,
externalTableLocation, storageFormat);
- flowFile = session.putAttribute(flowFile, ATTR_OUTPUT_TABLE,
tableName);
+ final Map<String,String> attributes = new
HashMap<>(flowFile.getAttributes());
+ OutputMetadataHolder outputMetadataHolder =
checkAndUpdateTableSchema(attributes, connection, recordSchema, tableName,
partitionClauseElements,
+ createIfNotExists, externalTableLocation,
storageFormat, updateFieldNames);
+ if (outputMetadataHolder != null) {
+ // The output schema changed (i.e. field names were
updated), so write out the corresponding FlowFile
+ try {
+ final FlowFile inputFlowFile = flowFile;
+ flowFile = session.write(flowFile, (in, out) -> {
+
+ // if we fail to create the RecordReader then we
want to route to failure, so we need to
+ // handle this separately from the other
IOExceptions which normally route to retry
+ final RecordReader recordReader;
+ final RecordSetWriter recordSetWriter;
+ try {
+ recordReader =
recordReaderFactory.createRecordReader(inputFlowFile, in, getLogger());
+ recordSetWriter =
recordWriterFactory.createWriter(getLogger(),
outputMetadataHolder.getOutputSchema(), out, attributes);
+ } catch (Exception e) {
+ if(e instanceof IOException) {
+ throw (IOException) e;
+ }
+ throw new IOException(new
RecordReaderFactoryException("Unable to create RecordReader", e));
+ }
+
+ WriteResult writeResult =
updateRecords(recordSchema, outputMetadataHolder, recordReader,
recordSetWriter);
+ recordSetWriter.flush();
+ recordSetWriter.close();
+ attributes.put("record.count",
String.valueOf(writeResult.getRecordCount()));
+ attributes.put(CoreAttributes.MIME_TYPE.key(),
recordSetWriter.getMimeType());
+ attributes.putAll(writeResult.getAttributes());
+ });
+ } catch (final Exception e) {
+ getLogger().error("Failed to process {}; will route to
failure", new Object[]{flowFile, e});
+ // Since we are wrapping the exceptions above there
should always be a cause
+ // but it's possible it might not have a message. This
handles that by logging
+ // the name of the class thrown.
+ Throwable c = e.getCause();
+ if (c != null) {
+ session.putAttribute(flowFile,
"record.error.message", (c.getLocalizedMessage() != null) ?
c.getLocalizedMessage() : c.getClass().getCanonicalName() + " Thrown");
+ } else {
+ session.putAttribute(flowFile,
"record.error.message", e.getClass().getCanonicalName() + " Thrown");
+ }
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ }
+ attributes.put(ATTR_OUTPUT_TABLE, tableName);
+ flowFile = session.putAllAttributes(flowFile, attributes);
session.getProvenanceReporter().invokeRemoteProcess(flowFile,
dbcpService.getConnectionURL());
session.transfer(flowFile, REL_SUCCESS);
}
@@ -351,9 +478,9 @@ public class UpdateHiveTable extends AbstractProcessor {
}
}
- private synchronized void checkAndUpdateTableSchema(final ProcessSession
session, final FlowFile flowFile, final Connection conn, final RecordSchema
schema,
- final String
tableName, List<String> partitionClause, final boolean createIfNotExists,
- final String
externalTableLocation, final String storageFormat) throws IOException {
+ private synchronized OutputMetadataHolder
checkAndUpdateTableSchema(Map<String,String> attributes, final Connection conn,
final RecordSchema schema,
+ final
String tableName, List<String> partitionClause, final boolean createIfNotExists,
+ final
String externalTableLocation, final String storageFormat, final boolean
updateFieldNames) throws IOException {
// Read in the current table metadata, compare it to the reader's
schema, and
// add any columns from the schema that are missing in the table
try (Statement s = conn.createStatement()) {
@@ -461,7 +588,7 @@ public class UpdateHiveTable extends AbstractProcessor {
String partitionColumnName;
columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName) &&
!columnName.startsWith("#")) {
- hiveColumns.add(columnName);
+ partitionColumns.add(columnName);
}
// If the column was a header, check for a blank line to
follow and skip it, otherwise add the column name
if (columnName.startsWith("#")) {
@@ -483,7 +610,7 @@ public class UpdateHiveTable extends AbstractProcessor {
for (int i = 0; i < partitionClauseSize; i++) {
String partitionName = partitionClause.get(i).split("
")[0];
- String partitionValue =
flowFile.getAttribute(partitionName);
+ String partitionValue = attributes.get(partitionName);
if (StringUtils.isEmpty(partitionValue)) {
throw new IOException("No value found for partition
value attribute '" + partitionName + "'");
}
@@ -557,9 +684,84 @@ public class UpdateHiveTable extends AbstractProcessor {
outputPath = tableLocation + "/" + String.join("/",
partitionColumnsLocationList);
}
- session.putAttribute(flowFile, ATTR_OUTPUT_PATH, outputPath);
+ // If updating field names, return a new RecordSchema, otherwise
return null
+ OutputMetadataHolder outputMetadataHolder;
+ if (updateFieldNames) {
+ List<RecordField> inputRecordFields = schema.getFields();
+ List<RecordField> outputRecordFields = new ArrayList<>();
+ Map<String,String> fieldMap = new HashMap<>();
+ boolean needsUpdating = false;
+
+ for (RecordField inputRecordField : inputRecordFields) {
+ final String inputRecordFieldName =
inputRecordField.getFieldName();
+ boolean found = false;
+ for (String hiveColumnName : hiveColumns) {
+ if
(inputRecordFieldName.equalsIgnoreCase(hiveColumnName)) {
+ // Set a flag if the field name doesn't match the
column name exactly. This overall flag will determine whether
+ // the records need updating (if true) or not (if
false)
+ if (!inputRecordFieldName.equals(hiveColumnName)) {
+ needsUpdating = true;
+ }
+ fieldMap.put(inputRecordFieldName, hiveColumnName);
+ outputRecordFields.add(new
RecordField(hiveColumnName, inputRecordField.getDataType(),
inputRecordField.getDefaultValue(), inputRecordField.isNullable()));
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ // If the input field wasn't a Hive table column, add
it back to the schema as-is
+ fieldMap.put(inputRecordFieldName,
inputRecordFieldName);
+ }
+ }
+ outputMetadataHolder = needsUpdating ? new
OutputMetadataHolder(new SimpleRecordSchema(outputRecordFields), fieldMap)
+ : null;
+ } else {
+ outputMetadataHolder = null;
+ }
+ attributes.put(ATTR_OUTPUT_PATH, outputPath);
+ return outputMetadataHolder;
} catch (Exception e) {
throw new IOException(e);
}
}
+
+ private synchronized WriteResult updateRecords(final RecordSchema
inputRecordSchema, final OutputMetadataHolder outputMetadataHolder,
+ final RecordReader reader,
final RecordSetWriter writer) throws IOException {
+ try {
+ writer.beginRecordSet();
+ Record inputRecord;
+ while((inputRecord = reader.nextRecord()) != null) {
+ List<RecordField> inputRecordFields =
inputRecordSchema.getFields();
+ Map<String,Object> outputRecordFields = new
HashMap<>(inputRecordFields.size());
+ // Copy values from input field name to output field name
+ for(Map.Entry<String,String> mapping :
outputMetadataHolder.getFieldMap().entrySet()) {
+ outputRecordFields.put(mapping.getValue(),
inputRecord.getValue(mapping.getKey()));
+ }
+ Record outputRecord = new
MapRecord(outputMetadataHolder.getOutputSchema(), outputRecordFields);
+ writer.write(outputRecord);
+ }
+ return writer.finishRecordSet();
+
+ } catch (MalformedRecordException mre) {
+ throw new IOException("Error reading records: "+mre.getMessage(),
mre);
+ }
+ }
+
+ private static class OutputMetadataHolder {
+ private final RecordSchema outputSchema;
+ private final Map<String,String> fieldMap;
+
+ public OutputMetadataHolder(RecordSchema outputSchema, Map<String,
String> fieldMap) {
+ this.outputSchema = outputSchema;
+ this.fieldMap = fieldMap;
+ }
+
+ public RecordSchema getOutputSchema() {
+ return outputSchema;
+ }
+
+ public Map<String, String> getFieldMap() {
+ return fieldMap;
+ }
+ }
}
diff --git
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/UpdateHive3Table.java
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/UpdateHive3Table.java
index aa8f13c..ba53d6e 100644
---
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/UpdateHive3Table.java
+++
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/UpdateHive3Table.java
@@ -27,10 +27,13 @@ import
org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.dbcp.hive.Hive3DBCPService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@@ -41,8 +44,15 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.pattern.DiscontinuedException;
import
org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
+import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.StringUtils;
@@ -55,9 +65,12 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@@ -73,7 +86,10 @@ import java.util.stream.Collectors;
@WritesAttribute(attribute = "output.table", description = "This
attribute is written on the flow files routed to the 'success' "
+ "and 'failure' relationships, and contains the target table
name."),
@WritesAttribute(attribute = "output.path", description = "This
attribute is written on the flow files routed to the 'success' "
- + "and 'failure' relationships, and contains the path on the
file system to the table (or partition location if the table is partitioned).")
+ + "and 'failure' relationships, and contains the path on the
file system to the table (or partition location if the table is partitioned)."),
+ @WritesAttribute(attribute = "mime.type", description = "Sets the
mime.type attribute to the MIME Type specified by the Record Writer, only if a
Record Writer is specified "
+ + "and Update Field Names is 'true'."),
+ @WritesAttribute(attribute = "record.count", description = "Sets the
number of records in the FlowFile, only if a Record Writer is specified and
Update Field Names is 'true'.")
})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@RequiresInstanceClassLoading
@@ -163,6 +179,31 @@ public class UpdateHive3Table extends AbstractProcessor {
.dependsOn(CREATE_TABLE, CREATE_IF_NOT_EXISTS)
.build();
+ static final PropertyDescriptor UPDATE_FIELD_NAMES = new
PropertyDescriptor.Builder()
+ .name("hive3-update-field-names")
+ .displayName("Update Field Names")
+ .description("This property indicates whether to update the output
schema such that the field names are set to the exact column names from the
specified "
+ + "table. This should be used if the incoming record field
names may not match the table's column names in terms of upper- and lower-case.
For example, this property should be "
+ + "set to true if the output FlowFile (and target table
storage) is Avro format, as Hive/Impala expects the field names to match the
column names exactly.")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor RECORD_WRITER_FACTORY = new
PropertyDescriptor.Builder()
+ .name("hive3-record-writer")
+ .displayName("Record Writer")
+ .description("Specifies the Controller Service to use for writing
results to a FlowFile. The Record Writer should use Inherit Schema to emulate
the inferred schema behavior, i.e. "
+ + "an explicit schema need not be defined in the writer,
and will be supplied by the same logic used to infer the schema from the column
types. If Create Table Strategy is set "
+ + "'Create If Not Exists', the Record Writer's output
format must match the Record Reader's format in order for the data to be placed
in the created table location. Note that "
+ + "this property is only used if 'Update Field Names' is
set to true and the field names do not all match the column names exactly. If
no "
+ + "update is needed for any field names (or 'Update Field
Names' is false), the Record Writer is not used and instead the input FlowFile
is routed to success or failure "
+ + "without modification.")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .dependsOn(UPDATE_FIELD_NAMES, "true")
+ .required(true)
+ .build();
+
static final PropertyDescriptor EXTERNAL_TABLE_LOCATION = new
PropertyDescriptor.Builder()
.name("hive3-external-table-location")
.displayName("External Table Location")
@@ -237,6 +278,8 @@ public class UpdateHive3Table extends AbstractProcessor {
props.add(TABLE_MANAGEMENT_STRATEGY);
props.add(EXTERNAL_TABLE_LOCATION);
props.add(TABLE_STORAGE_FORMAT);
+ props.add(UPDATE_FIELD_NAMES);
+ props.add(RECORD_WRITER_FACTORY);
props.add(QUERY_TIMEOUT);
propertyDescriptors = Collections.unmodifiableList(props);
@@ -258,6 +301,30 @@ public class UpdateHive3Table extends AbstractProcessor {
}
@Override
+ protected Collection<ValidationResult> customValidate(ValidationContext
validationContext) {
+ List<ValidationResult> validationResults = new
ArrayList<>(super.customValidate(validationContext));
+ final boolean recordWriterFactorySet =
validationContext.getProperty(RECORD_WRITER_FACTORY).isSet();
+ final boolean createIfNotExists =
validationContext.getProperty(CREATE_TABLE).getValue().equals(CREATE_IF_NOT_EXISTS.getValue());
+ final boolean updateFieldNames =
validationContext.getProperty(UPDATE_FIELD_NAMES).asBoolean();
+
+ if (!recordWriterFactorySet && updateFieldNames) {
+ validationResults.add(new
ValidationResult.Builder().subject(RECORD_WRITER_FACTORY.getDisplayName())
+ .explanation("Record Writer must be set if 'Update Field
Names' is true").valid(false).build());
+ }
+ final String tableManagementStrategy =
validationContext.getProperty(TABLE_MANAGEMENT_STRATEGY).getValue();
+ final boolean managedTable;
+ if
(!ATTRIBUTE_DRIVEN_TABLE.getValue().equals(tableManagementStrategy)) {
+ managedTable =
MANAGED_TABLE.getValue().equals(tableManagementStrategy);
+ // Ensure valid configuration for external tables
+ if (createIfNotExists && !managedTable &&
!validationContext.getProperty(EXTERNAL_TABLE_LOCATION).isSet()) {
+ validationResults.add(new
ValidationResult.Builder().subject(EXTERNAL_TABLE_LOCATION.getDisplayName())
+ .explanation("External Table Location must be set when
Table Management Strategy is set to External").valid(false).build());
+ }
+ }
+ return validationResults;
+ }
+
+ @Override
public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
FlowFile flowFile = session.get();
@@ -266,6 +333,7 @@ public class UpdateHive3Table extends AbstractProcessor {
}
final RecordReaderFactory recordReaderFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ final RecordSetWriterFactory recordWriterFactory =
context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
final String tableName =
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String partitionClauseString =
context.getProperty(PARTITION_CLAUSE).evaluateAttributeExpressions(flowFile).getValue();
List<String> partitionClauseElements = null;
@@ -292,13 +360,26 @@ public class UpdateHive3Table extends AbstractProcessor {
new Object[]{RecordReader.class.getSimpleName(),
flowFile},
rrfe
);
+ // Since we are wrapping the exceptions above there should
always be a cause
+ // but it's possible it might not have a message. This handles
that by logging
+ // the name of the class thrown.
+ Throwable c = rrfe.getCause();
+ if (c != null) {
+ session.putAttribute(flowFile, "record.error.message",
(c.getLocalizedMessage() != null) ? c.getLocalizedMessage() :
c.getClass().getCanonicalName() + " Thrown");
+ } else {
+ session.putAttribute(flowFile, "record.error.message",
rrfe.getClass().getCanonicalName() + " Thrown");
+ }
session.transfer(flowFile, REL_FAILURE);
return;
}
- RecordSchema recordSchema = reader.getSchema();
+ final RecordSchema recordSchema = reader.getSchema();
final boolean createIfNotExists =
context.getProperty(CREATE_TABLE).getValue().equals(CREATE_IF_NOT_EXISTS.getValue());
+ final boolean updateFieldNames =
context.getProperty(UPDATE_FIELD_NAMES).asBoolean();
+ if (recordWriterFactory == null && updateFieldNames) {
+ throw new ProcessException("Record Writer must be set if
'Update Field Names' is true");
+ }
final String tableManagementStrategy =
context.getProperty(TABLE_MANAGEMENT_STRATEGY).getValue();
final boolean managedTable;
if
(ATTRIBUTE_DRIVEN_TABLE.getValue().equals(tableManagementStrategy)) {
@@ -333,8 +414,54 @@ public class UpdateHive3Table extends AbstractProcessor {
final Hive3DBCPService dbcpService =
context.getProperty(HIVE_DBCP_SERVICE).asControllerService(Hive3DBCPService.class);
try (final Connection connection = dbcpService.getConnection()) {
- checkAndUpdateTableSchema(session, flowFile, connection,
recordSchema, tableName, partitionClauseElements, createIfNotExists,
externalTableLocation, storageFormat);
- flowFile = session.putAttribute(flowFile, ATTR_OUTPUT_TABLE,
tableName);
+ final Map<String,String> attributes = new
HashMap<>(flowFile.getAttributes());
+ OutputMetadataHolder outputMetadataHolder =
checkAndUpdateTableSchema(attributes, connection, recordSchema, tableName,
partitionClauseElements,
+ createIfNotExists, externalTableLocation,
storageFormat, updateFieldNames);
+ if (outputMetadataHolder != null) {
+ // The output schema changed (i.e. field names were
updated), so write out the corresponding FlowFile
+ try {
+ final FlowFile inputFlowFile = flowFile;
+ flowFile = session.write(flowFile, (in, out) -> {
+
+ // if we fail to create the RecordReader then we
want to route to failure, so we need to
+ // handle this separately from the other
IOExceptions which normally route to retry
+ final RecordReader recordReader;
+ final RecordSetWriter recordSetWriter;
+ try {
+ recordReader =
recordReaderFactory.createRecordReader(inputFlowFile, in, getLogger());
+ recordSetWriter =
recordWriterFactory.createWriter(getLogger(),
outputMetadataHolder.getOutputSchema(), out, attributes);
+ } catch (Exception e) {
+ if(e instanceof IOException) {
+ throw (IOException) e;
+ }
+ throw new IOException(new
RecordReaderFactoryException("Unable to create RecordReader", e));
+ }
+
+ WriteResult writeResult =
updateRecords(recordSchema, outputMetadataHolder, recordReader,
recordSetWriter);
+ recordSetWriter.flush();
+ recordSetWriter.close();
+ attributes.put("record.count",
String.valueOf(writeResult.getRecordCount()));
+ attributes.put(CoreAttributes.MIME_TYPE.key(),
recordSetWriter.getMimeType());
+ attributes.putAll(writeResult.getAttributes());
+ });
+ } catch (final Exception e) {
+ getLogger().error("Failed to process {}; will route to
failure", new Object[]{flowFile, e});
+ // Since we are wrapping the exceptions above there
should always be a cause
+ // but it's possible it might not have a message. This
handles that by logging
+ // the name of the class thrown.
+ Throwable c = e.getCause();
+ if (c != null) {
+ session.putAttribute(flowFile,
"record.error.message", (c.getLocalizedMessage() != null) ?
c.getLocalizedMessage() : c.getClass().getCanonicalName() + " Thrown");
+ } else {
+ session.putAttribute(flowFile,
"record.error.message", e.getClass().getCanonicalName() + " Thrown");
+ }
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ }
+ attributes.put(ATTR_OUTPUT_TABLE, tableName);
+ flowFile = session.putAllAttributes(flowFile, attributes);
session.getProvenanceReporter().invokeRemoteProcess(flowFile,
dbcpService.getConnectionURL());
session.transfer(flowFile, REL_SUCCESS);
}
@@ -351,9 +478,9 @@ public class UpdateHive3Table extends AbstractProcessor {
}
}
- private synchronized void checkAndUpdateTableSchema(final ProcessSession
session, final FlowFile flowFile, final Connection conn, final RecordSchema
schema,
- final String
tableName, List<String> partitionClause, final boolean createIfNotExists,
- final String
externalTableLocation, final String storageFormat) throws IOException {
+ private synchronized OutputMetadataHolder
checkAndUpdateTableSchema(Map<String,String> attributes, final Connection conn,
final RecordSchema schema,
+ final
String tableName, List<String> partitionClause, final boolean createIfNotExists,
+ final
String externalTableLocation, final String storageFormat, final boolean
updateFieldNames) throws IOException {
// Read in the current table metadata, compare it to the reader's
schema, and
// add any columns from the schema that are missing in the table
try (Statement s = conn.createStatement()) {
@@ -461,7 +588,7 @@ public class UpdateHive3Table extends AbstractProcessor {
String partitionColumnName;
columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName) &&
!columnName.startsWith("#")) {
- hiveColumns.add(columnName);
+ partitionColumns.add(columnName);
}
// If the column was a header, check for a blank line to
follow and skip it, otherwise add the column name
if (columnName.startsWith("#")) {
@@ -483,7 +610,7 @@ public class UpdateHive3Table extends AbstractProcessor {
for (int i = 0; i < partitionClauseSize; i++) {
String partitionName = partitionClause.get(i).split("
")[0];
- String partitionValue =
flowFile.getAttribute(partitionName);
+ String partitionValue = attributes.get(partitionName);
if (StringUtils.isEmpty(partitionValue)) {
throw new IOException("No value found for partition
value attribute '" + partitionName + "'");
}
@@ -557,9 +684,84 @@ public class UpdateHive3Table extends AbstractProcessor {
outputPath = tableLocation + "/" + String.join("/",
partitionColumnsLocationList);
}
- session.putAttribute(flowFile, ATTR_OUTPUT_PATH, outputPath);
+ // If updating field names, return a new RecordSchema, otherwise
return null
+ OutputMetadataHolder outputMetadataHolder;
+ if (updateFieldNames) {
+ List<RecordField> inputRecordFields = schema.getFields();
+ List<RecordField> outputRecordFields = new ArrayList<>();
+ Map<String,String> fieldMap = new HashMap<>();
+ boolean needsUpdating = false;
+
+ for (RecordField inputRecordField : inputRecordFields) {
+ final String inputRecordFieldName =
inputRecordField.getFieldName();
+ boolean found = false;
+ for (String hiveColumnName : hiveColumns) {
+ if
(inputRecordFieldName.equalsIgnoreCase(hiveColumnName)) {
+ // Set a flag if the field name doesn't match the
column name exactly. This overall flag will determine whether
+ // the records need updating (if true) or not (if
false)
+ if (!inputRecordFieldName.equals(hiveColumnName)) {
+ needsUpdating = true;
+ }
+ fieldMap.put(inputRecordFieldName, hiveColumnName);
+ outputRecordFields.add(new
RecordField(hiveColumnName, inputRecordField.getDataType(),
inputRecordField.getDefaultValue(), inputRecordField.isNullable()));
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ // If the input field wasn't a Hive table column, add
it back to the schema as-is
+ fieldMap.put(inputRecordFieldName,
inputRecordFieldName);
+ }
+ }
+ outputMetadataHolder = needsUpdating ? new
OutputMetadataHolder(new SimpleRecordSchema(outputRecordFields), fieldMap)
+ : null;
+ } else {
+ outputMetadataHolder = null;
+ }
+ attributes.put(ATTR_OUTPUT_PATH, outputPath);
+ return outputMetadataHolder;
} catch (Exception e) {
throw new IOException(e);
}
}
+
+ private synchronized WriteResult updateRecords(final RecordSchema
inputRecordSchema, final OutputMetadataHolder outputMetadataHolder,
+ final RecordReader reader, final
RecordSetWriter writer) throws IOException {
+ try {
+ writer.beginRecordSet();
+ Record inputRecord;
+ while((inputRecord = reader.nextRecord()) != null) {
+ List<RecordField> inputRecordFields =
inputRecordSchema.getFields();
+ Map<String,Object> outputRecordFields = new
HashMap<>(inputRecordFields.size());
+ // Copy values from input field name to output field name
+ for(Map.Entry<String,String> mapping :
outputMetadataHolder.getFieldMap().entrySet()) {
+ outputRecordFields.put(mapping.getValue(),
inputRecord.getValue(mapping.getKey()));
+ }
+ Record outputRecord = new
MapRecord(outputMetadataHolder.getOutputSchema(), outputRecordFields);
+ writer.write(outputRecord);
+ }
+ return writer.finishRecordSet();
+
+ } catch (MalformedRecordException mre) {
+ throw new IOException("Error reading records: "+mre.getMessage(),
mre);
+ }
+ }
+
+ private static class OutputMetadataHolder {
+ private final RecordSchema outputSchema;
+ private final Map<String,String> fieldMap;
+
+ public OutputMetadataHolder(RecordSchema outputSchema, Map<String,
String> fieldMap) {
+ this.outputSchema = outputSchema;
+ this.fieldMap = fieldMap;
+ }
+
+ public RecordSchema getOutputSchema() {
+ return outputSchema;
+ }
+
+ public Map<String, String> getFieldMap() {
+ return fieldMap;
+ }
+ }
}
diff --git
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestUpdateHive3Table.java
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestUpdateHive3Table.java
index 4eaa7e4..d4be1e8 100644
---
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestUpdateHive3Table.java
+++
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestUpdateHive3Table.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.csv.CSVRecordSetWriter;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.dbcp.hive.Hive3DBCPService;
import org.apache.nifi.logging.ComponentLog;
@@ -30,6 +31,7 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
@@ -395,6 +397,104 @@ public class TestUpdateHive3Table {
runner.assertTransferCount(UpdateHive3Table.REL_FAILURE, 1);
}
+ @Test
+ public void testUpdateFields() throws Exception {
+ final String avroSchema = IOUtils.toString(new
FileInputStream("src/test/resources/user_mixedcase.avsc"),
StandardCharsets.UTF_8);
+ schema = new Schema.Parser().parse(avroSchema);
+ configure(processor, 3);
+ runner.setProperty(UpdateHive3Table.TABLE_NAME, "users");
+
+ RecordSetWriterFactory recordWriter = new CSVRecordSetWriter();
+ runner.addControllerService("writer", recordWriter);
+ runner.enableControllerService(recordWriter);
+ runner.setProperty(UpdateHive3Table.UPDATE_FIELD_NAMES, "true");
+ runner.assertNotValid();
+ runner.setProperty(UpdateHive3Table.RECORD_WRITER_FACTORY, "writer");
+
+ final MockHiveConnectionPool service = new
MockHiveConnectionPool("test");
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+ runner.setProperty(UpdateHive3Table.HIVE_DBCP_SERVICE, "dbcp");
+ runner.enqueue(new byte[0]);
+ runner.run();
+
+ runner.assertTransferCount(UpdateHive3Table.REL_SUCCESS, 1);
+ final MockFlowFile flowFile =
runner.getFlowFilesForRelationship(UpdateHive3Table.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals(UpdateHive3Table.ATTR_OUTPUT_TABLE,
"users");
+ flowFile.assertAttributeEquals(UpdateHive3Table.ATTR_OUTPUT_PATH,
"hdfs://mycluster:8020/warehouse/tablespace/managed/hive/users");
+ flowFile.assertAttributeEquals("record.count", "3");
+ assertTrue(service.getExecutedStatements().isEmpty());
+ // Verify the table column names are the field names in the output
+
assertTrue(flowFile.getContent().startsWith("name,favorite_number,favorite_color,scale\n"));
+ }
+
+ @Test
+ public void testUpdateFieldsAllNamesMatchTableExists() throws Exception {
+ final String avroSchema = IOUtils.toString(new
FileInputStream("src/test/resources/user.avsc"), StandardCharsets.UTF_8);
+ schema = new Schema.Parser().parse(avroSchema);
+ configure(processor, 3);
+ runner.setProperty(UpdateHive3Table.TABLE_NAME, "users");
+
+ RecordSetWriterFactory recordWriter = new CSVRecordSetWriter();
+ runner.addControllerService("writer", recordWriter);
+ runner.enableControllerService(recordWriter);
+ runner.setProperty(UpdateHive3Table.UPDATE_FIELD_NAMES, "true");
+ runner.assertNotValid();
+ runner.setProperty(UpdateHive3Table.RECORD_WRITER_FACTORY, "writer");
+
+ final MockHiveConnectionPool service = new
MockHiveConnectionPool("test");
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+ runner.setProperty(UpdateHive3Table.HIVE_DBCP_SERVICE, "dbcp");
+
runner.enqueue("name,favorite_number,favorite_color,scale\n".getBytes(StandardCharsets.UTF_8));
+ runner.run();
+
+ runner.assertTransferCount(UpdateHive3Table.REL_SUCCESS, 1);
+ final MockFlowFile flowFile =
runner.getFlowFilesForRelationship(UpdateHive3Table.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals(UpdateHive3Table.ATTR_OUTPUT_TABLE,
"users");
+ flowFile.assertAttributeEquals(UpdateHive3Table.ATTR_OUTPUT_PATH,
"hdfs://mycluster:8020/warehouse/tablespace/managed/hive/users");
+ flowFile.assertAttributeNotExists("record.count");
+ assertTrue(service.getExecutedStatements().isEmpty());
+ // Verify the table column names are the field names in the output
+
assertTrue(flowFile.getContent().startsWith("name,favorite_number,favorite_color,scale\n"));
+ }
+
+ @Test
+ public void testUpdateFieldsAllNamesMatchCreateTable() throws Exception {
+ final String avroSchema = IOUtils.toString(new
FileInputStream("src/test/resources/user_mixedcase.avsc"),
StandardCharsets.UTF_8);
+ schema = new Schema.Parser().parse(avroSchema);
+ configure(processor, 1);
+ runner.setProperty(UpdateHive3Table.TABLE_NAME, "_newTable");
+
+ RecordSetWriterFactory recordWriter = new CSVRecordSetWriter();
+ runner.addControllerService("writer", recordWriter);
+ runner.enableControllerService(recordWriter);
+ runner.setProperty(UpdateHive3Table.UPDATE_FIELD_NAMES, "true");
+ runner.assertNotValid();
+ runner.setProperty(UpdateHive3Table.RECORD_WRITER_FACTORY, "writer");
+
+ runner.setProperty(UpdateHive3Table.CREATE_TABLE,
UpdateHive3Table.CREATE_IF_NOT_EXISTS);
+ runner.setProperty(UpdateHive3Table.TABLE_STORAGE_FORMAT,
UpdateHive3Table.AVRO);
+ final MockHiveConnectionPool service = new
MockHiveConnectionPool("_newTable");
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+ runner.setProperty(UpdateHive3Table.HIVE_DBCP_SERVICE, "dbcp");
+
runner.enqueue("name,favorite_number,favorite_color,scale\n".getBytes(StandardCharsets.UTF_8));
+ runner.run();
+
+ runner.assertTransferCount(UpdateHive3Table.REL_SUCCESS, 1);
+ final MockFlowFile flowFile =
runner.getFlowFilesForRelationship(UpdateHive3Table.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals(UpdateHive3Table.ATTR_OUTPUT_TABLE,
"_newTable");
+ flowFile.assertAttributeEquals(UpdateHive3Table.ATTR_OUTPUT_PATH,
"hdfs://mycluster:8020/warehouse/tablespace/managed/hive/_newTable");
+ flowFile.assertAttributeEquals("record.count", "1");
+ List<String> statements = service.getExecutedStatements();
+ assertEquals(1, statements.size());
+ assertEquals("CREATE TABLE IF NOT EXISTS `_newTable` (`NAME` STRING,
`Favorite_number` INT, `favorite_Color` STRING, `scale` DOUBLE) STORED AS AVRO",
+ statements.get(0));
+ // Verify the table column names are the field names in the output
+
assertTrue(flowFile.getContent().startsWith("name,favorite_number,favorite_color,scale\n"));
+ }
+
private static final class MockUpdateHive3Table extends UpdateHive3Table {
}
diff --git
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/user_mixedcase.avsc
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/user_mixedcase.avsc
new file mode 100644
index 0000000..67eb56d
--- /dev/null
+++
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/user_mixedcase.avsc
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "NAME", "type": "string"},
+ {"name": "Favorite_number", "type": ["int", "null"]},
+ {"name": "favorite_Color", "type": ["string", "null"]},
+ {"name": "scale", "type": ["double", "null"]}
+ ]
+}
diff --git
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/processors/hive/UpdateHive_1_1Table.java
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/processors/hive/UpdateHive_1_1Table.java
index f6d03a6..38ce502 100644
---
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/processors/hive/UpdateHive_1_1Table.java
+++
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/processors/hive/UpdateHive_1_1Table.java
@@ -26,10 +26,13 @@ import
org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.dbcp.hive.Hive_1_1DBCPService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@@ -40,9 +43,16 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.pattern.DiscontinuedException;
import
org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
+import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
@@ -60,9 +70,12 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@@ -78,7 +91,10 @@ import java.util.stream.Collectors;
@WritesAttribute(attribute = "output.table", description = "This
attribute is written on the flow files routed to the 'success' "
+ "and 'failure' relationships, and contains the target table
name."),
@WritesAttribute(attribute = "output.path", description = "This
attribute is written on the flow files routed to the 'success' "
- + "and 'failure' relationships, and contains the path on the
file system to the table (or partition location if the table is partitioned).")
+ + "and 'failure' relationships, and contains the path on the
file system to the table (or partition location if the table is partitioned)."),
+ @WritesAttribute(attribute = "mime.type", description = "Sets the
mime.type attribute to the MIME Type specified by the Record Writer, only if a
Record Writer is specified "
+ + "and Update Field Names is 'true'."),
+ @WritesAttribute(attribute = "record.count", description = "Sets the
number of records in the FlowFile, only if a Record Writer is specified and
Update Field Names is 'true'.")
})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@RequiresInstanceClassLoading
@@ -168,6 +184,31 @@ public class UpdateHive_1_1Table extends AbstractProcessor
{
.dependsOn(CREATE_TABLE, CREATE_IF_NOT_EXISTS)
.build();
+ static final PropertyDescriptor UPDATE_FIELD_NAMES = new
PropertyDescriptor.Builder()
+ .name("hive11-update-field-names")
+ .displayName("Update Field Names")
+ .description("This property indicates whether to update the output
schema such that the field names are set to the exact column names from the
specified "
+ + "table. This should be used if the incoming record field
names may not match the table's column names in terms of upper- and lower-case.
For example, this property should be "
+ + "set to true if the output FlowFile (and target table
storage) is Avro format, as Hive/Impala expects the field names to match the
column names exactly.")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor RECORD_WRITER_FACTORY = new
PropertyDescriptor.Builder()
+ .name("hive11-record-writer")
+ .displayName("Record Writer")
+ .description("Specifies the Controller Service to use for writing
results to a FlowFile. The Record Writer should use Inherit Schema to emulate
the inferred schema behavior, i.e. "
+ + "an explicit schema need not be defined in the writer,
and will be supplied by the same logic used to infer the schema from the column
types. If Create Table Strategy is set "
+ + "'Create If Not Exists', the Record Writer's output
format must match the Record Reader's format in order for the data to be placed
in the created table location. Note that "
+ + "this property is only used if 'Update Field Names' is
set to true and the field names do not all match the column names exactly. If
no "
+ + "update is needed for any field names (or 'Update Field
Names' is false), the Record Writer is not used and instead the input FlowFile
is routed to success or failure "
+ + "without modification.")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .dependsOn(UPDATE_FIELD_NAMES, "true")
+ .required(true)
+ .build();
+
static final PropertyDescriptor EXTERNAL_TABLE_LOCATION = new
PropertyDescriptor.Builder()
.name("hive11-external-table-location")
.displayName("External Table Location")
@@ -242,6 +283,8 @@ public class UpdateHive_1_1Table extends AbstractProcessor {
props.add(TABLE_MANAGEMENT_STRATEGY);
props.add(EXTERNAL_TABLE_LOCATION);
props.add(TABLE_STORAGE_FORMAT);
+ props.add(UPDATE_FIELD_NAMES);
+ props.add(RECORD_WRITER_FACTORY);
props.add(QUERY_TIMEOUT);
propertyDescriptors = Collections.unmodifiableList(props);
@@ -263,6 +306,30 @@ public class UpdateHive_1_1Table extends AbstractProcessor
{
}
@Override
+ protected Collection<ValidationResult> customValidate(ValidationContext
validationContext) {
+ List<ValidationResult> validationResults = new
ArrayList<>(super.customValidate(validationContext));
+ final boolean recordWriterFactorySet =
validationContext.getProperty(RECORD_WRITER_FACTORY).isSet();
+ final boolean createIfNotExists =
validationContext.getProperty(CREATE_TABLE).getValue().equals(CREATE_IF_NOT_EXISTS.getValue());
+ final boolean updateFieldNames =
validationContext.getProperty(UPDATE_FIELD_NAMES).asBoolean();
+
+ if (!recordWriterFactorySet && updateFieldNames) {
+ validationResults.add(new
ValidationResult.Builder().subject(RECORD_WRITER_FACTORY.getDisplayName())
+ .explanation("Record Writer must be set if 'Update Field
Names' is true").valid(false).build());
+ }
+ final String tableManagementStrategy =
validationContext.getProperty(TABLE_MANAGEMENT_STRATEGY).getValue();
+ final boolean managedTable;
+ if
(!ATTRIBUTE_DRIVEN_TABLE.getValue().equals(tableManagementStrategy)) {
+ managedTable =
MANAGED_TABLE.getValue().equals(tableManagementStrategy);
+ // Ensure valid configuration for external tables
+ if (createIfNotExists && !managedTable &&
!validationContext.getProperty(EXTERNAL_TABLE_LOCATION).isSet()) {
+ validationResults.add(new
ValidationResult.Builder().subject(EXTERNAL_TABLE_LOCATION.getDisplayName())
+ .explanation("External Table Location must be set when
Table Management Strategy is set to External").valid(false).build());
+ }
+ }
+ return validationResults;
+ }
+
+ @Override
public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
FlowFile flowFile = session.get();
@@ -271,6 +338,7 @@ public class UpdateHive_1_1Table extends AbstractProcessor {
}
final RecordReaderFactory recordReaderFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ final RecordSetWriterFactory recordWriterFactory =
context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
final String tableName =
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String partitionClauseString =
context.getProperty(PARTITION_CLAUSE).evaluateAttributeExpressions(flowFile).getValue();
List<String> partitionClauseElements = null;
@@ -297,13 +365,26 @@ public class UpdateHive_1_1Table extends
AbstractProcessor {
new Object[]{RecordReader.class.getSimpleName(),
flowFile},
rrfe
);
+ // Since we are wrapping the exceptions above there should
always be a cause
+ // but it's possible it might not have a message. This handles
that by logging
+ // the name of the class thrown.
+ Throwable c = rrfe.getCause();
+ if (c != null) {
+ session.putAttribute(flowFile, "record.error.message",
(c.getLocalizedMessage() != null) ? c.getLocalizedMessage() :
c.getClass().getCanonicalName() + " Thrown");
+ } else {
+ session.putAttribute(flowFile, "record.error.message",
rrfe.getClass().getCanonicalName() + " Thrown");
+ }
session.transfer(flowFile, REL_FAILURE);
return;
}
- RecordSchema recordSchema = reader.getSchema();
+ final RecordSchema recordSchema = reader.getSchema();
final boolean createIfNotExists =
context.getProperty(CREATE_TABLE).getValue().equals(CREATE_IF_NOT_EXISTS.getValue());
+ final boolean updateFieldNames =
context.getProperty(UPDATE_FIELD_NAMES).asBoolean();
+ if (recordWriterFactory == null && updateFieldNames) {
+ throw new ProcessException("Record Writer must be set if
'Update Field Names' is true");
+ }
final String tableManagementStrategy =
context.getProperty(TABLE_MANAGEMENT_STRATEGY).getValue();
final boolean managedTable;
if
(ATTRIBUTE_DRIVEN_TABLE.getValue().equals(tableManagementStrategy)) {
@@ -336,8 +417,54 @@ public class UpdateHive_1_1Table extends AbstractProcessor
{
final String storageFormat =
context.getProperty(TABLE_STORAGE_FORMAT).getValue();
final Hive_1_1DBCPService dbcpService =
context.getProperty(HIVE_DBCP_SERVICE).asControllerService(Hive_1_1DBCPService.class);
try (final Connection connection = dbcpService.getConnection()) {
- checkAndUpdateTableSchema(session, flowFile, connection,
recordSchema, tableName, partitionClauseElements, createIfNotExists,
externalTableLocation, storageFormat);
- flowFile = session.putAttribute(flowFile, ATTR_OUTPUT_TABLE,
tableName);
+ final Map<String,String> attributes = new
HashMap<>(flowFile.getAttributes());
+ OutputMetadataHolder outputMetadataHolder =
checkAndUpdateTableSchema(attributes, connection, recordSchema, tableName,
partitionClauseElements,
+ createIfNotExists, externalTableLocation,
storageFormat, updateFieldNames);
+ if (outputMetadataHolder != null) {
+ // The output schema changed (i.e. field names were
updated), so write out the corresponding FlowFile
+ try {
+ final FlowFile inputFlowFile = flowFile;
+ flowFile = session.write(flowFile, (in, out) -> {
+
+ // if we fail to create the RecordReader then we
want to route to failure, so we need to
+ // handle this separately from the other
IOExceptions which normally route to retry
+ final RecordReader recordReader;
+ final RecordSetWriter recordSetWriter;
+ try {
+ recordReader =
recordReaderFactory.createRecordReader(inputFlowFile, in, getLogger());
+ recordSetWriter =
recordWriterFactory.createWriter(getLogger(),
outputMetadataHolder.getOutputSchema(), out, attributes);
+ } catch (Exception e) {
+ if(e instanceof IOException) {
+ throw (IOException) e;
+ }
+ throw new IOException(new
RecordReaderFactoryException("Unable to create RecordReader", e));
+ }
+
+ WriteResult writeResult =
updateRecords(recordSchema, outputMetadataHolder, recordReader,
recordSetWriter);
+ recordSetWriter.flush();
+ recordSetWriter.close();
+ attributes.put("record.count",
String.valueOf(writeResult.getRecordCount()));
+ attributes.put(CoreAttributes.MIME_TYPE.key(),
recordSetWriter.getMimeType());
+ attributes.putAll(writeResult.getAttributes());
+ });
+ } catch (final Exception e) {
+ getLogger().error("Failed to process {}; will route to
failure", new Object[]{flowFile, e});
+ // Since we are wrapping the exceptions above there
should always be a cause
+ // but it's possible it might not have a message. This
handles that by logging
+ // the name of the class thrown.
+ Throwable c = e.getCause();
+ if (c != null) {
+ session.putAttribute(flowFile,
"record.error.message", (c.getLocalizedMessage() != null) ?
c.getLocalizedMessage() : c.getClass().getCanonicalName() + " Thrown");
+ } else {
+ session.putAttribute(flowFile,
"record.error.message", e.getClass().getCanonicalName() + " Thrown");
+ }
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ }
+ attributes.put(ATTR_OUTPUT_TABLE, tableName);
+ flowFile = session.putAllAttributes(flowFile, attributes);
session.getProvenanceReporter().invokeRemoteProcess(flowFile,
dbcpService.getConnectionURL());
session.transfer(flowFile, REL_SUCCESS);
}
@@ -356,9 +483,9 @@ public class UpdateHive_1_1Table extends AbstractProcessor {
}
}
- private synchronized void checkAndUpdateTableSchema(final ProcessSession
session, final FlowFile flowFile, final Connection conn, final RecordSchema
schema,
- final String
tableName, List<String> partitionClause, final boolean createIfNotExists,
- final String
externalTableLocation, final String storageFormat) throws IOException {
+ private synchronized OutputMetadataHolder
checkAndUpdateTableSchema(Map<String,String> attributes, final Connection conn,
final RecordSchema schema,
+ final
String tableName, List<String> partitionClause, final boolean createIfNotExists,
+ final
String externalTableLocation, final String storageFormat, final boolean
updateFieldNames) throws IOException {
// Read in the current table metadata, compare it to the reader's
schema, and
// add any columns from the schema that are missing in the table
try (Statement s = conn.createStatement()) {
@@ -466,7 +593,7 @@ public class UpdateHive_1_1Table extends AbstractProcessor {
String partitionColumnName;
columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName) &&
!columnName.startsWith("#")) {
- hiveColumns.add(columnName);
+ partitionColumns.add(columnName);
}
// If the column was a header, check for a blank line to
follow and skip it, otherwise add the column name
if (columnName.startsWith("#")) {
@@ -488,7 +615,7 @@ public class UpdateHive_1_1Table extends AbstractProcessor {
for (int i = 0; i < partitionClauseSize; i++) {
String partitionName = partitionClause.get(i).split("
")[0];
- String partitionValue =
flowFile.getAttribute(partitionName);
+ String partitionValue = attributes.get(partitionName);
if (StringUtils.isEmpty(partitionValue)) {
throw new IOException("No value found for partition
value attribute '" + partitionName + "'");
}
@@ -562,7 +689,42 @@ public class UpdateHive_1_1Table extends AbstractProcessor
{
outputPath = tableLocation + "/" + String.join("/",
partitionColumnsLocationList);
}
- session.putAttribute(flowFile, ATTR_OUTPUT_PATH, outputPath);
+ // If updating field names, return a new RecordSchema, otherwise
return null
+ OutputMetadataHolder outputMetadataHolder;
+ if (updateFieldNames) {
+ List<RecordField> inputRecordFields = schema.getFields();
+ List<RecordField> outputRecordFields = new ArrayList<>();
+ Map<String,String> fieldMap = new HashMap<>();
+ boolean needsUpdating = false;
+
+ for (RecordField inputRecordField : inputRecordFields) {
+ final String inputRecordFieldName =
inputRecordField.getFieldName();
+ boolean found = false;
+ for (String hiveColumnName : hiveColumns) {
+ if
(inputRecordFieldName.equalsIgnoreCase(hiveColumnName)) {
+ // Set a flag if the field name doesn't match the
column name exactly. This overall flag will determine whether
+ // the records need updating (if true) or not (if
false)
+ if (!inputRecordFieldName.equals(hiveColumnName)) {
+ needsUpdating = true;
+ }
+ fieldMap.put(inputRecordFieldName, hiveColumnName);
+ outputRecordFields.add(new
RecordField(hiveColumnName, inputRecordField.getDataType(),
inputRecordField.getDefaultValue(), inputRecordField.isNullable()));
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ // If the input field wasn't a Hive table column, add
it back to the schema as-is
+ fieldMap.put(inputRecordFieldName,
inputRecordFieldName);
+ }
+ }
+ outputMetadataHolder = needsUpdating ? new
OutputMetadataHolder(new SimpleRecordSchema(outputRecordFields), fieldMap)
+ : null;
+ } else {
+ outputMetadataHolder = null;
+ }
+ attributes.put(ATTR_OUTPUT_PATH, outputPath);
+ return outputMetadataHolder;
} catch (Exception e) {
throw new IOException(e);
}
@@ -644,7 +806,46 @@ public class UpdateHive_1_1Table extends AbstractProcessor
{
}
return null;
}
-
throw new IllegalArgumentException("Error converting Avro type " +
dataType.name() + " to Hive type");
}
+
+ private synchronized WriteResult updateRecords(final RecordSchema
inputRecordSchema, final OutputMetadataHolder outputMetadataHolder,
+ final RecordReader reader,
final RecordSetWriter writer) throws IOException {
+ try {
+ writer.beginRecordSet();
+ Record inputRecord;
+ while((inputRecord = reader.nextRecord()) != null) {
+ List<RecordField> inputRecordFields =
inputRecordSchema.getFields();
+ Map<String,Object> outputRecordFields = new
HashMap<>(inputRecordFields.size());
+ // Copy values from input field name to output field name
+ for(Map.Entry<String,String> mapping :
outputMetadataHolder.getFieldMap().entrySet()) {
+ outputRecordFields.put(mapping.getValue(),
inputRecord.getValue(mapping.getKey()));
+ }
+ Record outputRecord = new
MapRecord(outputMetadataHolder.getOutputSchema(), outputRecordFields);
+ writer.write(outputRecord);
+ }
+ return writer.finishRecordSet();
+
+ } catch (MalformedRecordException mre) {
+ throw new IOException("Error reading records: "+mre.getMessage(),
mre);
+ }
+ }
+
+ private static class OutputMetadataHolder {
+ private final RecordSchema outputSchema;
+ private final Map<String,String> fieldMap;
+
+ public OutputMetadataHolder(RecordSchema outputSchema, Map<String,
String> fieldMap) {
+ this.outputSchema = outputSchema;
+ this.fieldMap = fieldMap;
+ }
+
+ public RecordSchema getOutputSchema() {
+ return outputSchema;
+ }
+
+ public Map<String, String> getFieldMap() {
+ return fieldMap;
+ }
+ }
}