Repository: nifi
Updated Branches:
  refs/heads/master 2d6d7710c -> 376af83a3


NIFI-3055 StandardRecordWriter Can Throw UTFDataFormatException
* Updated StandardRecordWriter, even though it is now deprecated to consider 
the encoding behavior of java.io.DataOutputStream.writeUTF() and truncate 
string values such that the UTF representation will not be longer than that 
DataOutputStream's 64K UTF format limit.
* Updated the new SchemaRecordWriter class to similarly truncate long Strings 
that will be written as UTF.
* Add tests to confirm handling of large UTF strings and various edge 
conditions of UTF string handling.

Signed-off-by: Mike Moser <[email protected]>

This closes #1469.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/376af83a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/376af83a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/376af83a

Branch: refs/heads/master
Commit: 376af83a3dcfa5361be0859b54d91d30c685494e
Parents: 2d6d771
Author: Joe Skora <[email protected]>
Authored: Thu Feb 2 18:24:56 2017 +0000
Committer: Mike Moser <[email protected]>
Committed: Fri Feb 3 20:52:32 2017 +0000

----------------------------------------------------------------------
 nifi-commons/nifi-schema-utils/pom.xml          |   5 +
 .../repository/schema/SchemaRecordWriter.java   |  52 +++++++-
 .../schema/TestSchemaRecordReaderWriter.java    | 130 +++++++++++++++++++
 .../pom.xml                                     |   5 +
 .../nifi/provenance/StandardRecordWriter.java   |  61 +++++++--
 .../TestStandardRecordReaderWriter.java         |  43 ++++++
 6 files changed, 285 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/376af83a/nifi-commons/nifi-schema-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-schema-utils/pom.xml 
b/nifi-commons/nifi-schema-utils/pom.xml
index 11c62aa..a1fecb3 100644
--- a/nifi-commons/nifi-schema-utils/pom.xml
+++ b/nifi-commons/nifi-schema-utils/pom.xml
@@ -21,6 +21,11 @@
     </parent>
     <artifactId>nifi-schema-utils</artifactId>
     <dependencies>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/376af83a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java
 
b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java
index 4693889..81043bc 100644
--- 
a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java
+++ 
b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java
@@ -17,9 +17,13 @@
 
 package org.apache.nifi.repository.schema;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.io.UTFDataFormatException;
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
 import java.util.List;
@@ -27,6 +31,10 @@ import java.util.Map;
 
 public class SchemaRecordWriter {
 
+    public static final int MAX_ALLOWED_UTF_LENGTH = 65_535;
+
+    private static final Logger logger = 
LoggerFactory.getLogger(SchemaRecordWriter.class);
+
     public void writeRecord(final Record record, final OutputStream out) 
throws IOException {
         // write sentinel value to indicate that there is a record. This 
allows the reader to then read one
         // byte and check if -1. If so, the reader knows there are no more 
records. If not, then the reader
@@ -105,7 +113,7 @@ public class SchemaRecordWriter {
                 out.writeLong((Long) value);
                 break;
             case STRING:
-                out.writeUTF((String) value);
+                writeUTFLimited(out, (String) value);
                 break;
             case LONG_STRING:
                 final byte[] charArray = ((String) 
value).getBytes(StandardCharsets.UTF_8);
@@ -126,7 +134,7 @@ public class SchemaRecordWriter {
                 break;
             case UNION:
                 final NamedValue namedValue = (NamedValue) value;
-                out.writeUTF(namedValue.getName());
+                writeUTFLimited(out, namedValue.getName());
                 final Record childRecord = (Record) namedValue.getValue();
                 writeRecordFields(childRecord, out);
                 break;
@@ -136,4 +144,44 @@ public class SchemaRecordWriter {
                 break;
         }
     }
+
+    private void writeUTFLimited(final DataOutputStream out, final String 
utfString) throws IOException {
+        try {
+            out.writeUTF(utfString);
+        } catch (UTFDataFormatException e) {
+            final String truncated = utfString.substring(0, 
getCharsInUTFLength(utfString, MAX_ALLOWED_UTF_LENGTH));
+            logger.warn("Truncating repository record value!  Attempted to 
write {} chars that encode to a UTF byte length greater than "
+                            + "supported maximum ({}), truncating to {} 
chars.",
+                    utfString.length(), MAX_ALLOWED_UTF_LENGTH, 
truncated.length());
+            if (logger.isDebugEnabled()) {
+                logger.warn("String value was:\n{}", truncated);
+            }
+            out.writeUTF(truncated);
+        }
+    }
+
+
+    static int getCharsInUTFLength(final String str, final int utfLimit) {
+        // see java.io.DataOutputStream.writeUTF()
+        int strlen = str.length();
+        int utflen = 0;
+        int c;
+
+        /* use charAt instead of copying String to Char array */
+        for (int i = 0; i < strlen; i++) {
+            c = str.charAt(i);
+            if ((c >= 0x0001) & (c <= 0x007F)) {
+                utflen++;
+            } else if (c > 0x07FF) {
+                utflen += 3;
+            } else {
+                utflen += 2;
+            }
+            if (utflen > utfLimit) {
+                return i;
+            }
+        }
+        return strlen;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/376af83a/nifi-commons/nifi-schema-utils/src/test/java/org/apache/nifi/repository/schema/TestSchemaRecordReaderWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-schema-utils/src/test/java/org/apache/nifi/repository/schema/TestSchemaRecordReaderWriter.java
 
b/nifi-commons/nifi-schema-utils/src/test/java/org/apache/nifi/repository/schema/TestSchemaRecordReaderWriter.java
index 18548fb..5eb815a 100644
--- 
a/nifi-commons/nifi-schema-utils/src/test/java/org/apache/nifi/repository/schema/TestSchemaRecordReaderWriter.java
+++ 
b/nifi-commons/nifi-schema-utils/src/test/java/org/apache/nifi/repository/schema/TestSchemaRecordReaderWriter.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.repository.schema;
 
+import static 
org.apache.nifi.repository.schema.SchemaRecordWriter.MAX_ALLOWED_UTF_LENGTH;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -26,6 +27,7 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -33,10 +35,18 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.lang3.StringUtils;
 import org.junit.Test;
 
 public class TestSchemaRecordReaderWriter {
 
+    private static Character utfCharOneByte = '$';
+    private static Character utfCharTwoByte = '¢';
+    private static Character utfCharThreeByte = '€';
+    private static String utfStringOneByte = utfCharOneByte.toString();
+    private static String utfStringTwoByte = utfCharTwoByte.toString();
+    private static String utfStringThreeByte = utfCharThreeByte.toString();
+
     @Test
     @SuppressWarnings("unchecked")
     public void testRoundTrip() throws IOException {
@@ -172,6 +182,126 @@ public class TestSchemaRecordReaderWriter {
         }
     }
 
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testUTFLargerThan64k() throws IOException {
+        // Create a Record Schema
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new SimpleRecordField("int present", FieldType.INT, 
Repetition.ZERO_OR_ONE));
+        fields.add(new SimpleRecordField("string present", FieldType.STRING, 
Repetition.ZERO_OR_ONE));
+
+        final RecordSchema schema = new RecordSchema(fields);
+
+        // Create a Map of record fields to values, so that we can create a 
Record to write out
+        final Map<RecordField, Object> values = new LinkedHashMap<>();
+        values.put(createField("int present", FieldType.INT), 42);
+        final String utfString = utfStringOneByte + utfStringTwoByte + 
utfStringThreeByte;  // 3 chars and 6 utf8 bytes
+        final String seventyK = StringUtils.repeat(utfString, 21845);  // 
65,535 chars and 131070 utf8 bytes
+        assertTrue(seventyK.length() == 65535);
+        assertTrue(seventyK.getBytes("UTF-8").length == 131070);
+        values.put(createField("string present", FieldType.STRING), seventyK);
+
+        final FieldMapRecord originalRecord = new FieldMapRecord(values, 
schema);
+
+        // Write out a record and read it back in.
+        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+            // Write the schema to the stream
+            schema.writeTo(baos);
+
+            // Write the record twice, to make sure that we're able to 
read/write multiple sequential records
+            final SchemaRecordWriter writer = new SchemaRecordWriter();
+            writer.writeRecord(originalRecord, baos);
+            writer.writeRecord(originalRecord, baos);
+
+            try (final InputStream in = new 
ByteArrayInputStream(baos.toByteArray())) {
+                // Read the Schema from the stream and create a Record Reader 
for reading records, based on this schema
+                final RecordSchema readSchema = RecordSchema.readFrom(in);
+                final SchemaRecordReader reader = 
SchemaRecordReader.fromSchema(readSchema);
+
+                // Read the records and verify the values.
+                for (int i=0; i < 2; i++) {
+                    final Record record = reader.readRecord(in);
+
+                    assertNotNull(record);
+                    assertEquals(42, record.getFieldValue("int present"));
+                    assertTrue(MAX_ALLOWED_UTF_LENGTH - 
((String)record.getFieldValue("string present")).getBytes("utf-8").length <= 3);
+                    assertEquals(32768, ((String)record.getFieldValue("string 
present")).length());
+                }
+
+                // Ensure that there is no more data.
+                assertNull(reader.readRecord(in));
+            }
+        }
+    }
+
+    @Test
+    public void testSingleCharUTFLengths() {
+        // verify handling of single characters mapping to 1, 2, and 3 utf 
byte strings
+        assertEquals("test 1 char string truncated to 0 utf bytes should be 
0", 0, SchemaRecordWriter.getCharsInUTFLength(utfStringOneByte, 0));
+        assertEquals("test 2 char string truncated to 0 utf bytes should be 
0", 0, SchemaRecordWriter.getCharsInUTFLength(utfStringTwoByte, 0));
+        assertEquals("test 3 char string truncated to 0 utf bytes should be 
0", 0, SchemaRecordWriter.getCharsInUTFLength(utfStringThreeByte, 0));
+        assertEquals("test 1 char string truncated to 1 utf bytes should be 
1", 1, SchemaRecordWriter.getCharsInUTFLength(utfStringOneByte, 1));
+        assertEquals("test 2 char string truncated to 1 utf bytes should be 
0", 0, SchemaRecordWriter.getCharsInUTFLength(utfStringTwoByte, 1));
+        assertEquals("test 3 char string truncated to 1 utf bytes should be 
0", 0, SchemaRecordWriter.getCharsInUTFLength(utfStringThreeByte, 1));
+        assertEquals("test 1 char string truncated to 2 utf bytes should be 
1", 1, SchemaRecordWriter.getCharsInUTFLength(utfStringOneByte, 2));
+        assertEquals("test 2 char string truncated to 2 utf bytes should be 
2", 1, SchemaRecordWriter.getCharsInUTFLength(utfStringTwoByte, 2));
+        assertEquals("test 3 char string truncated to 2 utf bytes should be 
0", 0, SchemaRecordWriter.getCharsInUTFLength(utfStringThreeByte, 2));
+        assertEquals("test 1 char string truncated to 3 utf bytes should be 
1", 1, SchemaRecordWriter.getCharsInUTFLength(utfStringOneByte, 3));
+        assertEquals("test 2 char string truncated to 3 utf bytes should be 
2", 1, SchemaRecordWriter.getCharsInUTFLength(utfStringTwoByte, 3));
+        assertEquals("test 3 char string truncated to 3 utf bytes should be 
3", 1, SchemaRecordWriter.getCharsInUTFLength(utfStringThreeByte, 3));
+    }
+
+    @Test
+    public void testMultiCharUTFLengths() {
+        // test boundary conditions as 1, 2, and 3 UTF byte chars are included 
into utf limit                                                  positions used 
by strings
+        final String testString1 = utfStringOneByte + utfStringTwoByte + 
utfStringThreeByte;                                                // char 
'abc' utf 'abbccc'
+        assertEquals("test 6 char string truncated to 0 utf bytes should be 
0", 0, SchemaRecordWriter.getCharsInUTFLength(testString1, 0)); //            
utf ''
+        assertEquals("test 6 char string truncated to 1 utf bytes should be 
1", 1, SchemaRecordWriter.getCharsInUTFLength(testString1, 1)); //            
utf 'a'
+        assertEquals("test 6 char string truncated to 2 utf bytes should be 
1", 1, SchemaRecordWriter.getCharsInUTFLength(testString1, 2)); //            
utf 'a'
+        assertEquals("test 6 char string truncated to 3 utf bytes should be 
2", 2, SchemaRecordWriter.getCharsInUTFLength(testString1, 3)); //            
utf 'abb'
+        assertEquals("test 6 char string truncated to 4 utf bytes should be 
2", 2, SchemaRecordWriter.getCharsInUTFLength(testString1, 4)); //            
utf 'abb'
+        assertEquals("test 6 char string truncated to 5 utf bytes should be 
2", 2, SchemaRecordWriter.getCharsInUTFLength(testString1, 5)); //            
utf 'abb'
+        assertEquals("test 6 char string truncated to 6 utf bytes should be 
3", 3, SchemaRecordWriter.getCharsInUTFLength(testString1, 6)); //            
utf 'abbccc'
+    }
+
+    @Test
+    public void testSmallCharUTFLengths() throws UnsupportedEncodingException {
+        final String string12b = StringUtils.repeat(utfStringOneByte + 
utfStringTwoByte + utfStringThreeByte, 2);
+
+        assertEquals("test multi-char string truncated to  0 utf bytes should 
be 0", 0, SchemaRecordWriter.getCharsInUTFLength(string12b,  0));
+        assertEquals("test multi-char string truncated to  1 utf bytes should 
be 0", 1, SchemaRecordWriter.getCharsInUTFLength(string12b,  1));
+        assertEquals("test multi-char string truncated to  2 utf bytes should 
be 0", 1, SchemaRecordWriter.getCharsInUTFLength(string12b,  2));
+        assertEquals("test multi-char string truncated to  3 utf bytes should 
be 0", 2, SchemaRecordWriter.getCharsInUTFLength(string12b,  3));
+        assertEquals("test multi-char string truncated to  4 utf bytes should 
be 0", 2, SchemaRecordWriter.getCharsInUTFLength(string12b,  4));
+        assertEquals("test multi-char string truncated to  5 utf bytes should 
be 0", 2, SchemaRecordWriter.getCharsInUTFLength(string12b,  5));
+        assertEquals("test multi-char string truncated to  6 utf bytes should 
be 0", 3, SchemaRecordWriter.getCharsInUTFLength(string12b,  6));
+        assertEquals("test multi-char string truncated to  7 utf bytes should 
be 0", 4, SchemaRecordWriter.getCharsInUTFLength(string12b,  7));
+        assertEquals("test multi-char string truncated to  8 utf bytes should 
be 0", 4, SchemaRecordWriter.getCharsInUTFLength(string12b,  8));
+        assertEquals("test multi-char string truncated to  9 utf bytes should 
be 0", 5, SchemaRecordWriter.getCharsInUTFLength(string12b,  9));
+        assertEquals("test multi-char string truncated to 10 utf bytes should 
be 0", 5, SchemaRecordWriter.getCharsInUTFLength(string12b, 10));
+        assertEquals("test multi-char string truncated to 11 utf bytes should 
be 0", 5, SchemaRecordWriter.getCharsInUTFLength(string12b, 11));
+        assertEquals("test multi-char string truncated to 12 utf bytes should 
be 0", 6, SchemaRecordWriter.getCharsInUTFLength(string12b, 12));
+    }
+
+    @Test
+    public void testLargeCharUTFLengths() {
+        final String string64k = StringUtils.repeat(utfStringOneByte + 
utfStringTwoByte + utfStringThreeByte, 21845);
+
+        assertEquals("test 64k char string should be 64k chars long", 65535, 
string64k.length());
+
+        // drop half the chars going to utf of 64k bytes -- (1+1+1) * 21845 = 
65535 chars which converts to (1+2+3) * 21845 = 131070 utf bytes so 1/2 is 
truncated
+        assertEquals("test 64k char string truncated to 65,535 utf bytes 
should be 32768", 32768, SchemaRecordWriter.getCharsInUTFLength(string64k, 
65535));
+
+        // dropping bytes off the end of utf length
+        assertEquals("test 64k char string truncated to 65,534 utf bytes 
should be 32767", 32767, SchemaRecordWriter.getCharsInUTFLength(string64k, 
65534)); // lost 2 byte char
+        assertEquals("test 64k char string truncated to 65,533 utf bytes 
should be 32767", 32767, SchemaRecordWriter.getCharsInUTFLength(string64k, 
65533));
+        assertEquals("test 64k char string truncated to 65,532 utf bytes 
should be 32766", 32766, SchemaRecordWriter.getCharsInUTFLength(string64k, 
65532)); // lost 1 byte char
+        assertEquals("test 64k char string truncated to 65,531 utf bytes 
should be 32765", 32765, SchemaRecordWriter.getCharsInUTFLength(string64k, 
65531)); // lost 3 byte char
+        assertEquals("test 64k char string truncated to 65,530 utf bytes 
should be 32765", 32765, SchemaRecordWriter.getCharsInUTFLength(string64k, 
65530));
+        assertEquals("test 64k char string truncated to 65,529 utf bytes 
should be 32765", 32765, SchemaRecordWriter.getCharsInUTFLength(string64k, 
65529));
+        assertEquals("test 64k char string truncated to 65,528 utf bytes 
should be 32764", 32764, SchemaRecordWriter.getCharsInUTFLength(string64k, 
65528)); // lost 2 byte char (again)
+    }
+
     private SimpleRecordField createField(final String fieldName, final 
FieldType type) {
         return new SimpleRecordField(fieldName, type, Repetition.ZERO_OR_ONE);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/376af83a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml
index a56296f..4db4169 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml
@@ -58,5 +58,10 @@
             <groupId>org.apache.lucene</groupId>
             <artifactId>lucene-queryparser</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/376af83a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
index a95bd4f..076e507 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
@@ -19,6 +19,7 @@ package org.apache.nifi.provenance;
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.io.UTFDataFormatException;
 import java.util.Collection;
 import java.util.Map;
 
@@ -34,6 +35,9 @@ import org.slf4j.LoggerFactory;
  */
 @Deprecated
 public class StandardRecordWriter extends CompressableRecordWriter implements 
RecordWriter {
+
+    public static final int MAX_ALLOWED_UTF_LENGTH = 65_535;
+
     private static final Logger logger = 
LoggerFactory.getLogger(StandardRecordWriter.class);
     public static final int SERIALIZATION_VERISON = 9;
     public static final String SERIALIZATION_NAME = 
"org.apache.nifi.provenance.PersistentProvenanceRepository";
@@ -72,7 +76,7 @@ public class StandardRecordWriter extends 
CompressableRecordWriter implements Re
         final ProvenanceEventType recordType = record.getEventType();
 
         out.writeLong(recordIdentifier);
-        out.writeUTF(record.getEventType().name());
+        writeUTFLimited(out, record.getEventType().name());
         out.writeLong(record.getEventTime());
         out.writeLong(record.getFlowFileEntryDate());
         out.writeLong(record.getEventDuration());
@@ -101,9 +105,9 @@ public class StandardRecordWriter extends 
CompressableRecordWriter implements Re
         // If Content Claim Info is present, write out a 'TRUE' followed by 
claim info. Else, write out 'false'.
         if (record.getContentClaimSection() != null && 
record.getContentClaimContainer() != null && record.getContentClaimIdentifier() 
!= null) {
             out.writeBoolean(true);
-            out.writeUTF(record.getContentClaimContainer());
-            out.writeUTF(record.getContentClaimSection());
-            out.writeUTF(record.getContentClaimIdentifier());
+            writeUTFLimited(out, record.getContentClaimContainer());
+            writeUTFLimited(out, record.getContentClaimSection());
+            writeUTFLimited(out, record.getContentClaimIdentifier());
             if (record.getContentClaimOffset() == null) {
                 out.writeLong(0L);
             } else {
@@ -117,9 +121,9 @@ public class StandardRecordWriter extends 
CompressableRecordWriter implements Re
         // If Previous Content Claim Info is present, write out a 'TRUE' 
followed by claim info. Else, write out 'false'.
         if (record.getPreviousContentClaimSection() != null && 
record.getPreviousContentClaimContainer() != null && 
record.getPreviousContentClaimIdentifier() != null) {
             out.writeBoolean(true);
-            out.writeUTF(record.getPreviousContentClaimContainer());
-            out.writeUTF(record.getPreviousContentClaimSection());
-            out.writeUTF(record.getPreviousContentClaimIdentifier());
+            writeUTFLimited(out, record.getPreviousContentClaimContainer());
+            writeUTFLimited(out, record.getPreviousContentClaimSection());
+            writeUTFLimited(out, record.getPreviousContentClaimIdentifier());
             if (record.getPreviousContentClaimOffset() == null) {
                 out.writeLong(0L);
             } else {
@@ -157,7 +161,7 @@ public class StandardRecordWriter extends 
CompressableRecordWriter implements Re
     }
 
     protected void writeUUID(final DataOutputStream out, final String uuid) 
throws IOException {
-        out.writeUTF(uuid);
+        writeUTFLimited(out, uuid);
     }
 
     protected void writeUUIDs(final DataOutputStream out, final 
Collection<String> list) throws IOException {
@@ -176,7 +180,7 @@ public class StandardRecordWriter extends 
CompressableRecordWriter implements Re
             out.writeBoolean(false);
         } else {
             out.writeBoolean(true);
-            out.writeUTF(toWrite);
+            writeUTFLimited(out, toWrite);
         }
     }
 
@@ -195,6 +199,45 @@ public class StandardRecordWriter extends 
CompressableRecordWriter implements Re
         out.write(bytes);
     }
 
+    private void writeUTFLimited(final java.io.DataOutputStream out, final 
String utfString) throws IOException {
+        try {
+            out.writeUTF(utfString);
+        } catch (UTFDataFormatException e) {
+            final String truncated = utfString.substring(0, 
getCharsInUTFLength(utfString, MAX_ALLOWED_UTF_LENGTH));
+            logger.warn("Truncating repository record value!  Attempted to 
write {} chars that encode to a UTF byte length greater than "
+                            + "supported maximum ({}), truncating to {} 
chars.",
+                    utfString.length(), MAX_ALLOWED_UTF_LENGTH, 
truncated.length());
+            if (logger.isDebugEnabled()) {
+                logger.warn("String value was:\n{}", truncated);
+            }
+            out.writeUTF(truncated);
+        }
+    }
+
+    static int getCharsInUTFLength(final String str, final int utfLimit) {
+        // see java.io.DataOutputStream.writeUTF()
+        int strlen = str.length();
+        int utflen = 0;
+        int c;
+
+        /* use charAt instead of copying String to Char array */
+        for (int i = 0; i < strlen; i++) {
+            c = str.charAt(i);
+            if ((c >= 0x0001) & (c <= 0x007F)) {
+                utflen++;
+            } else if (c > 0x07FF) {
+                utflen += 3;
+            } else {
+                utflen += 2;
+            }
+            if (utflen > utfLimit) {
+                return i;
+            }
+        }
+        return strlen;
+    }
+
+
 
     @Override
     public String toString() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/376af83a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
index cc69b18..dfa37e4 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
@@ -21,8 +21,12 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.provenance.serialization.RecordReader;
 import org.apache.nifi.provenance.serialization.RecordWriter;
 import org.apache.nifi.provenance.toc.NopTocWriter;
@@ -33,6 +37,9 @@ import org.apache.nifi.stream.io.NullOutputStream;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import static org.apache.nifi.provenance.TestUtil.createFlowFile;
+import static org.junit.Assert.assertTrue;
+
 public class TestStandardRecordReaderWriter extends 
AbstractTestRecordReaderWriter {
 
 
@@ -108,6 +115,42 @@ public class TestStandardRecordReaderWriter extends 
AbstractTestRecordReaderWrit
         System.out.println("Took " + millis + " millis to read " + numEvents + 
" events");
     }
 
+    @Test
+    public void testWriteUtfLargerThan64k() throws IOException, 
InterruptedException {
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("filename", "1.txt");
+        attributes.put("uuid", UUID.randomUUID().toString());
+
+        final ProvenanceEventBuilder builder = new 
StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+        final String seventyK = StringUtils.repeat("X", 70000);
+        assertTrue(seventyK.length() > 65535);
+        assertTrue(seventyK.getBytes("UTF-8").length > 65535);
+        builder.setDetails(seventyK);
+        final ProvenanceEventRecord record = builder.build();
+
+        try (final ByteArrayOutputStream headerOut = new 
ByteArrayOutputStream();
+             final DataOutputStream out = new DataOutputStream(headerOut)) {
+            out.writeUTF(PersistentProvenanceRepository.class.getName());
+            out.writeInt(9);
+        }
+
+        try (final ByteArrayOutputStream recordOut = new 
ByteArrayOutputStream();
+            final StandardRecordWriter writer = new 
StandardRecordWriter(recordOut, null, false, 0)) {
+
+            writer.writeHeader(1L);
+            recordOut.reset();
+
+            writer.writeRecord(record, 1L);
+        }
+    }
+
     @Override
     protected RecordWriter createWriter(File file, TocWriter tocWriter, 
boolean compressed, int uncompressedBlockSize) throws IOException {
         return new StandardRecordWriter(file, tocWriter, compressed, 
uncompressedBlockSize);

Reply via email to