NIFI-3055 StandardRecordWriter Can Throw UTFDataFormatException (1.x) * Remove function based on JDK source. * Add new function to find bytes based on RFC3629. * Add field name to log entry when field is truncated.
Signed-off-by: Mike Moser <[email protected]> This closes #1481 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e97d7460 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e97d7460 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e97d7460 Branch: refs/heads/support/nifi-0.7.x Commit: e97d7460a0fe0563b52969a378a0b21172a3c347 Parents: de8d013 Author: Joe Skora <[email protected]> Authored: Tue Feb 7 17:09:20 2017 +0000 Committer: Mike Moser <[email protected]> Committed: Mon Feb 13 16:07:03 2017 -0500 ---------------------------------------------------------------------- .../nifi/provenance/StandardRecordWriter.java | 84 ++++++++++---------- 1 file changed, 43 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/e97d7460/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java index f015cc8..b14726a 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java @@ -87,7 +87,7 @@ public class StandardRecordWriter implements RecordWriter { lastBlockOffset = rawOutStream.getBytesWritten(); resetWriteStream(firstEventId); - writeUTFLimited(out, PersistentProvenanceRepository.class.getName()); + writeUTFLimited(out, PersistentProvenanceRepository.class.getName(), "PersistentProvenanceRepository.class.name"); out.writeInt(PersistentProvenanceRepository.SERIALIZATION_VERSION); out.flush(); } catch (final IOException ioe) { @@ -165,7 +165,7 @@ public class StandardRecordWriter implements RecordWriter { } out.writeLong(recordIdentifier); - writeUTFLimited(out, record.getEventType().name()); + writeUTFLimited(out, record.getEventType().name(), "EventType"); out.writeLong(record.getEventTime()); out.writeLong(record.getFlowFileEntryDate()); out.writeLong(record.getEventDuration()); @@ -173,10 +173,10 @@ public class StandardRecordWriter implements RecordWriter { writeUUIDs(out, record.getLineageIdentifiers()); out.writeLong(record.getLineageStartDate()); - writeNullableString(out, record.getComponentId()); - writeNullableString(out, record.getComponentType()); + writeNullableString(out, record.getComponentId(), "ComponentId"); + writeNullableString(out, record.getComponentType(), "ComponentType"); writeUUID(out, record.getFlowFileUuid()); - writeNullableString(out, record.getDetails()); + writeNullableString(out, record.getDetails(), "Details"); // Write FlowFile attributes final Map<String, String> attrs = record.getPreviousAttributes(); @@ -196,9 +196,9 @@ public class StandardRecordWriter implements RecordWriter { // If Content Claim Info is present, write out a 'TRUE' followed by claim info. Else, write out 'false'. if (record.getContentClaimSection() != null && record.getContentClaimContainer() != null && record.getContentClaimIdentifier() != null) { out.writeBoolean(true); - writeUTFLimited(out, record.getContentClaimContainer()); - writeUTFLimited(out, record.getContentClaimSection()); - writeUTFLimited(out, record.getContentClaimIdentifier()); + writeUTFLimited(out, record.getContentClaimContainer(), "ContentClaimContainer"); + writeUTFLimited(out, record.getContentClaimSection(), "ContentClaimSection"); + writeUTFLimited(out, record.getContentClaimIdentifier(), "ContentClaimIdentifier"); if (record.getContentClaimOffset() == null) { out.writeLong(0L); } else { @@ -212,9 +212,9 @@ public class StandardRecordWriter implements RecordWriter { // If Previous Content Claim Info is present, write out a 'TRUE' followed by claim info. Else, write out 'false'. if (record.getPreviousContentClaimSection() != null && record.getPreviousContentClaimContainer() != null && record.getPreviousContentClaimIdentifier() != null) { out.writeBoolean(true); - writeUTFLimited(out, record.getPreviousContentClaimContainer()); - writeUTFLimited(out, record.getPreviousContentClaimSection()); - writeUTFLimited(out, record.getPreviousContentClaimIdentifier()); + writeUTFLimited(out, record.getPreviousContentClaimContainer(), "PreviousContentClaimContainer"); + writeUTFLimited(out, record.getPreviousContentClaimSection(), "PreviousContentClaimSection"); + writeUTFLimited(out, record.getPreviousContentClaimIdentifier(), "PreviousContentClaimIdentifier"); if (record.getPreviousContentClaimOffset() == null) { out.writeLong(0L); } else { @@ -231,23 +231,23 @@ public class StandardRecordWriter implements RecordWriter { } // write out the identifier of the destination queue. - writeNullableString(out, record.getSourceQueueIdentifier()); + writeNullableString(out, record.getSourceQueueIdentifier(), "SourceQueueIdentifier"); // Write type-specific info if (recordType == ProvenanceEventType.FORK || recordType == ProvenanceEventType.JOIN || recordType == ProvenanceEventType.CLONE || recordType == ProvenanceEventType.REPLAY) { writeUUIDs(out, record.getParentUuids()); writeUUIDs(out, record.getChildUuids()); } else if (recordType == ProvenanceEventType.RECEIVE) { - writeNullableString(out, record.getTransitUri()); - writeNullableString(out, record.getSourceSystemFlowFileIdentifier()); + writeNullableString(out, record.getTransitUri(), "TransitUri"); + writeNullableString(out, record.getSourceSystemFlowFileIdentifier(), "SourceSystemFlowFileIdentifier"); } else if (recordType == ProvenanceEventType.FETCH) { - writeNullableString(out, record.getTransitUri()); + writeNullableString(out, record.getTransitUri(), "TransitUri"); } else if (recordType == ProvenanceEventType.SEND) { - writeNullableString(out, record.getTransitUri()); + writeNullableString(out, record.getTransitUri(), "TransitUri"); } else if (recordType == ProvenanceEventType.ADDINFO) { - writeNullableString(out, record.getAlternateIdentifierUri()); + writeNullableString(out, record.getAlternateIdentifierUri(), "AlternateIdentifierUri"); } else if (recordType == ProvenanceEventType.ROUTE) { - writeNullableString(out, record.getRelationship()); + writeNullableString(out, record.getRelationship(), "Relationship"); } out.flush(); @@ -260,7 +260,7 @@ public class StandardRecordWriter implements RecordWriter { } protected void writeUUID(final DataOutputStream out, final String uuid) throws IOException { - writeUTFLimited(out, uuid); + writeUTFLimited(out, uuid, "UUID"); } protected void writeUUIDs(final DataOutputStream out, final Collection<String> list) throws IOException { @@ -274,12 +274,12 @@ public class StandardRecordWriter implements RecordWriter { } } - protected void writeNullableString(final DataOutputStream out, final String toWrite) throws IOException { + protected void writeNullableString(final DataOutputStream out, final String toWrite, final String fieldName) throws IOException { if (toWrite == null) { out.writeBoolean(false); } else { out.writeBoolean(true); - writeUTFLimited(out, toWrite); + writeUTFLimited(out, toWrite, fieldName); } } @@ -405,14 +405,14 @@ public class StandardRecordWriter implements RecordWriter { return dirtyFlag.get(); } - private void writeUTFLimited(final DataOutputStream out, final String utfString) throws IOException { + private void writeUTFLimited(final DataOutputStream out, final String utfString, final String fieldName) throws IOException { try { out.writeUTF(utfString); } catch (UTFDataFormatException e) { - final String truncated = utfString.substring(0, getCharsInUTFLength(utfString, MAX_ALLOWED_UTF_LENGTH)); - logger.warn("Truncating provenance record value! Attempted to write {} chars that encode to a UTF byte length greater than " + final String truncated = utfString.substring(0, getCharsInUTF8Limit(utfString, MAX_ALLOWED_UTF_LENGTH)); + logger.warn("Truncating repository record value for field '{}'! Attempted to write {} chars that encode to a UTF8 byte length greater than " + "supported maximum ({}), truncating to {} chars.", - utfString.length(), MAX_ALLOWED_UTF_LENGTH, truncated.length()); + (fieldName == null) ? "" : fieldName, utfString.length(), MAX_ALLOWED_UTF_LENGTH, truncated.length()); if (logger.isDebugEnabled()) { logger.warn("String value was:\n{}", truncated); } @@ -420,26 +420,28 @@ public class StandardRecordWriter implements RecordWriter { } } - static int getCharsInUTFLength(final String str, final int utfLimit) { - // see java.io.DataOutputStream.writeUTF() - int strlen = str.length(); - int utflen = 0; - int c; - - /* use charAt instead of copying String to Char array */ - for (int i = 0; i < strlen; i++) { - c = str.charAt(i); - if ((c >= 0x0001) & (c <= 0x007F)) { - utflen++; - } else if (c > 0x07FF) { - utflen += 3; + static int getCharsInUTF8Limit(final String str, final int utf8Limit) { + // Calculate how much of String fits within UTF8 byte limit based on RFC3629. + // + // Java String values use char[] for storage, so character values >0xFFFF that + // map to 4 byte UTF8 representations are not considered. + + final int charsInOriginal = str.length(); + int bytesInUTF8 = 0; + + for (int i = 0; i < charsInOriginal; i++) { + final int curr = str.charAt(i); + if (curr < 0x0080) { + bytesInUTF8++; + } else if (curr < 0x0800) { + bytesInUTF8 += 2; } else { - utflen += 2; + bytesInUTF8 += 3; } - if (utflen > utfLimit) { + if (bytesInUTF8 > utf8Limit) { return i; } } - return strlen; + return charsInOriginal; } }
