NIFI-3055 StandardRecordWriter Can Throw UTFDataFormatException (1.x)
* Remove function based on JDK source.
* Add new function to find bytes based on RFC3629.
* Add field name to log entry when field is truncated.

Signed-off-by: Mike Moser <[email protected]>
This closes #1481


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e97d7460
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e97d7460
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e97d7460

Branch: refs/heads/support/nifi-0.7.x
Commit: e97d7460a0fe0563b52969a378a0b21172a3c347
Parents: de8d013
Author: Joe Skora <[email protected]>
Authored: Tue Feb 7 17:09:20 2017 +0000
Committer: Mike Moser <[email protected]>
Committed: Mon Feb 13 16:07:03 2017 -0500

----------------------------------------------------------------------
 .../nifi/provenance/StandardRecordWriter.java   | 84 ++++++++++----------
 1 file changed, 43 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e97d7460/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
index f015cc8..b14726a 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
@@ -87,7 +87,7 @@ public class StandardRecordWriter implements RecordWriter {
             lastBlockOffset = rawOutStream.getBytesWritten();
             resetWriteStream(firstEventId);
 
-            writeUTFLimited(out, 
PersistentProvenanceRepository.class.getName());
+            writeUTFLimited(out, 
PersistentProvenanceRepository.class.getName(), 
"PersistentProvenanceRepository.class.name");
             out.writeInt(PersistentProvenanceRepository.SERIALIZATION_VERSION);
             out.flush();
         } catch (final IOException ioe) {
@@ -165,7 +165,7 @@ public class StandardRecordWriter implements RecordWriter {
             }
 
             out.writeLong(recordIdentifier);
-            writeUTFLimited(out, record.getEventType().name());
+            writeUTFLimited(out, record.getEventType().name(), "EventType");
             out.writeLong(record.getEventTime());
             out.writeLong(record.getFlowFileEntryDate());
             out.writeLong(record.getEventDuration());
@@ -173,10 +173,10 @@ public class StandardRecordWriter implements RecordWriter 
{
             writeUUIDs(out, record.getLineageIdentifiers());
             out.writeLong(record.getLineageStartDate());
 
-            writeNullableString(out, record.getComponentId());
-            writeNullableString(out, record.getComponentType());
+            writeNullableString(out, record.getComponentId(), "ComponentId");
+            writeNullableString(out, record.getComponentType(), 
"ComponentType");
             writeUUID(out, record.getFlowFileUuid());
-            writeNullableString(out, record.getDetails());
+            writeNullableString(out, record.getDetails(), "Details");
 
             // Write FlowFile attributes
             final Map<String, String> attrs = record.getPreviousAttributes();
@@ -196,9 +196,9 @@ public class StandardRecordWriter implements RecordWriter {
             // If Content Claim Info is present, write out a 'TRUE' followed 
by claim info. Else, write out 'false'.
             if (record.getContentClaimSection() != null && 
record.getContentClaimContainer() != null && record.getContentClaimIdentifier() 
!= null) {
                 out.writeBoolean(true);
-                writeUTFLimited(out, record.getContentClaimContainer());
-                writeUTFLimited(out, record.getContentClaimSection());
-                writeUTFLimited(out, record.getContentClaimIdentifier());
+                writeUTFLimited(out, record.getContentClaimContainer(), 
"ContentClaimContainer");
+                writeUTFLimited(out, record.getContentClaimSection(), 
"ContentClaimSection");
+                writeUTFLimited(out, record.getContentClaimIdentifier(), 
"ContentClaimIdentifier");
                 if (record.getContentClaimOffset() == null) {
                     out.writeLong(0L);
                 } else {
@@ -212,9 +212,9 @@ public class StandardRecordWriter implements RecordWriter {
             // If Previous Content Claim Info is present, write out a 'TRUE' 
followed by claim info. Else, write out 'false'.
             if (record.getPreviousContentClaimSection() != null && 
record.getPreviousContentClaimContainer() != null && 
record.getPreviousContentClaimIdentifier() != null) {
                 out.writeBoolean(true);
-                writeUTFLimited(out, 
record.getPreviousContentClaimContainer());
-                writeUTFLimited(out, record.getPreviousContentClaimSection());
-                writeUTFLimited(out, 
record.getPreviousContentClaimIdentifier());
+                writeUTFLimited(out, 
record.getPreviousContentClaimContainer(), "PreviousContentClaimContainer");
+                writeUTFLimited(out, record.getPreviousContentClaimSection(), 
"PreviousContentClaimSection");
+                writeUTFLimited(out, 
record.getPreviousContentClaimIdentifier(), "PreviousContentClaimIdentifier");
                 if (record.getPreviousContentClaimOffset() == null) {
                     out.writeLong(0L);
                 } else {
@@ -231,23 +231,23 @@ public class StandardRecordWriter implements RecordWriter 
{
             }
 
             // write out the identifier of the destination queue.
-            writeNullableString(out, record.getSourceQueueIdentifier());
+            writeNullableString(out, record.getSourceQueueIdentifier(), 
"SourceQueueIdentifier");
 
             // Write type-specific info
             if (recordType == ProvenanceEventType.FORK || recordType == 
ProvenanceEventType.JOIN || recordType == ProvenanceEventType.CLONE || 
recordType == ProvenanceEventType.REPLAY) {
                 writeUUIDs(out, record.getParentUuids());
                 writeUUIDs(out, record.getChildUuids());
             } else if (recordType == ProvenanceEventType.RECEIVE) {
-                writeNullableString(out, record.getTransitUri());
-                writeNullableString(out, 
record.getSourceSystemFlowFileIdentifier());
+                writeNullableString(out, record.getTransitUri(), "TransitUri");
+                writeNullableString(out, 
record.getSourceSystemFlowFileIdentifier(), "SourceSystemFlowFileIdentifier");
             } else if (recordType == ProvenanceEventType.FETCH) {
-                writeNullableString(out, record.getTransitUri());
+                writeNullableString(out, record.getTransitUri(), "TransitUri");
             } else if (recordType == ProvenanceEventType.SEND) {
-                writeNullableString(out, record.getTransitUri());
+                writeNullableString(out, record.getTransitUri(), "TransitUri");
             } else if (recordType == ProvenanceEventType.ADDINFO) {
-                writeNullableString(out, record.getAlternateIdentifierUri());
+                writeNullableString(out, record.getAlternateIdentifierUri(), 
"AlternateIdentifierUri");
             } else if (recordType == ProvenanceEventType.ROUTE) {
-                writeNullableString(out, record.getRelationship());
+                writeNullableString(out, record.getRelationship(), 
"Relationship");
             }
 
             out.flush();
@@ -260,7 +260,7 @@ public class StandardRecordWriter implements RecordWriter {
     }
 
     protected void writeUUID(final DataOutputStream out, final String uuid) 
throws IOException {
-        writeUTFLimited(out, uuid);
+        writeUTFLimited(out, uuid, "UUID");
     }
 
     protected void writeUUIDs(final DataOutputStream out, final 
Collection<String> list) throws IOException {
@@ -274,12 +274,12 @@ public class StandardRecordWriter implements RecordWriter 
{
         }
     }
 
-    protected void writeNullableString(final DataOutputStream out, final 
String toWrite) throws IOException {
+    protected void writeNullableString(final DataOutputStream out, final 
String toWrite, final String fieldName) throws IOException {
         if (toWrite == null) {
             out.writeBoolean(false);
         } else {
             out.writeBoolean(true);
-            writeUTFLimited(out, toWrite);
+            writeUTFLimited(out, toWrite, fieldName);
         }
     }
 
@@ -405,14 +405,14 @@ public class StandardRecordWriter implements RecordWriter 
{
         return dirtyFlag.get();
     }
 
-    private void writeUTFLimited(final DataOutputStream out, final String 
utfString) throws IOException {
+    private void writeUTFLimited(final DataOutputStream out, final String 
utfString, final String fieldName) throws IOException {
         try {
             out.writeUTF(utfString);
         } catch (UTFDataFormatException e) {
-            final String truncated = utfString.substring(0, 
getCharsInUTFLength(utfString, MAX_ALLOWED_UTF_LENGTH));
-            logger.warn("Truncating provenance record value!  Attempted to 
write {} chars that encode to a UTF byte length greater than "
+            final String truncated = utfString.substring(0, 
getCharsInUTF8Limit(utfString, MAX_ALLOWED_UTF_LENGTH));
+            logger.warn("Truncating repository record value for field '{}'!  
Attempted to write {} chars that encode to a UTF8 byte length greater than "
                             + "supported maximum ({}), truncating to {} 
chars.",
-                    utfString.length(), MAX_ALLOWED_UTF_LENGTH, 
truncated.length());
+                    (fieldName == null) ? "" : fieldName, utfString.length(), 
MAX_ALLOWED_UTF_LENGTH, truncated.length());
             if (logger.isDebugEnabled()) {
                 logger.warn("String value was:\n{}", truncated);
             }
@@ -420,26 +420,28 @@ public class StandardRecordWriter implements RecordWriter 
{
         }
     }
 
-    static int getCharsInUTFLength(final String str, final int utfLimit) {
-        // see java.io.DataOutputStream.writeUTF()
-        int strlen = str.length();
-        int utflen = 0;
-        int c;
-
-        /* use charAt instead of copying String to Char array */
-        for (int i = 0; i < strlen; i++) {
-            c = str.charAt(i);
-            if ((c >= 0x0001) & (c <= 0x007F)) {
-                utflen++;
-            } else if (c > 0x07FF) {
-                utflen += 3;
+    static int getCharsInUTF8Limit(final String str, final int utf8Limit) {
+        // Calculate how much of String fits within UTF8 byte limit based on 
RFC3629.
+        //
+        // Java String values use char[] for storage, so character values 
>0xFFFF that
+        // map to 4 byte UTF8 representations are not considered.
+
+        final int charsInOriginal = str.length();
+        int bytesInUTF8 = 0;
+
+        for (int i = 0; i < charsInOriginal; i++) {
+            final int curr = str.charAt(i);
+            if (curr < 0x0080) {
+                bytesInUTF8++;
+            } else if (curr < 0x0800) {
+                bytesInUTF8 += 2;
             } else {
-                utflen += 2;
+                bytesInUTF8 += 3;
             }
-            if (utflen > utfLimit) {
+            if (bytesInUTF8 > utf8Limit) {
                 return i;
             }
         }
-        return strlen;
+        return charsInOriginal;
     }
 }

Reply via email to