Repository: nifi Updated Branches: refs/heads/master 2d6d7710c -> 376af83a3
NIFI-3055 StandardRecordWriter Can Throw UTFDataFormatException * Updated StandardRecordWriter, even though it is now deprecated to consider the encoding behavior of java.io.DataOutputStream.writeUTF() and truncate string values such that the UTF representation will not be longer than that DataOutputStream's 64K UTF format limit. * Updated the new SchemaRecordWriter class to similarly truncate long Strings that will be written as UTF. * Add tests to confirm handling of large UTF strings and various edge conditions of UTF string handling. Signed-off-by: Mike Moser <[email protected]> This closes #1469. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/376af83a Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/376af83a Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/376af83a Branch: refs/heads/master Commit: 376af83a3dcfa5361be0859b54d91d30c685494e Parents: 2d6d771 Author: Joe Skora <[email protected]> Authored: Thu Feb 2 18:24:56 2017 +0000 Committer: Mike Moser <[email protected]> Committed: Fri Feb 3 20:52:32 2017 +0000 ---------------------------------------------------------------------- nifi-commons/nifi-schema-utils/pom.xml | 5 + .../repository/schema/SchemaRecordWriter.java | 52 +++++++- .../schema/TestSchemaRecordReaderWriter.java | 130 +++++++++++++++++++ .../pom.xml | 5 + .../nifi/provenance/StandardRecordWriter.java | 61 +++++++-- .../TestStandardRecordReaderWriter.java | 43 ++++++ 6 files changed, 285 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/376af83a/nifi-commons/nifi-schema-utils/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-schema-utils/pom.xml b/nifi-commons/nifi-schema-utils/pom.xml index 11c62aa..a1fecb3 100644 --- a/nifi-commons/nifi-schema-utils/pom.xml +++ b/nifi-commons/nifi-schema-utils/pom.xml @@ -21,6 +21,11 @@ </parent> <artifactId>nifi-schema-utils</artifactId> <dependencies> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/nifi/blob/376af83a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java index 4693889..81043bc 100644 --- a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java +++ b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java @@ -17,9 +17,13 @@ package org.apache.nifi.repository.schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.io.UTFDataFormatException; import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.List; @@ -27,6 +31,10 @@ import java.util.Map; public class SchemaRecordWriter { + public static final int MAX_ALLOWED_UTF_LENGTH = 65_535; + + private static final Logger logger = LoggerFactory.getLogger(SchemaRecordWriter.class); + public void writeRecord(final Record record, final OutputStream out) throws IOException { // write sentinel value to indicate that there is a record. This allows the reader to then read one // byte and check if -1. If so, the reader knows there are no more records. If not, then the reader @@ -105,7 +113,7 @@ public class SchemaRecordWriter { out.writeLong((Long) value); break; case STRING: - out.writeUTF((String) value); + writeUTFLimited(out, (String) value); break; case LONG_STRING: final byte[] charArray = ((String) value).getBytes(StandardCharsets.UTF_8); @@ -126,7 +134,7 @@ public class SchemaRecordWriter { break; case UNION: final NamedValue namedValue = (NamedValue) value; - out.writeUTF(namedValue.getName()); + writeUTFLimited(out, namedValue.getName()); final Record childRecord = (Record) namedValue.getValue(); writeRecordFields(childRecord, out); break; @@ -136,4 +144,44 @@ public class SchemaRecordWriter { break; } } + + private void writeUTFLimited(final DataOutputStream out, final String utfString) throws IOException { + try { + out.writeUTF(utfString); + } catch (UTFDataFormatException e) { + final String truncated = utfString.substring(0, getCharsInUTFLength(utfString, MAX_ALLOWED_UTF_LENGTH)); + logger.warn("Truncating repository record value! Attempted to write {} chars that encode to a UTF byte length greater than " + + "supported maximum ({}), truncating to {} chars.", + utfString.length(), MAX_ALLOWED_UTF_LENGTH, truncated.length()); + if (logger.isDebugEnabled()) { + logger.warn("String value was:\n{}", truncated); + } + out.writeUTF(truncated); + } + } + + + static int getCharsInUTFLength(final String str, final int utfLimit) { + // see java.io.DataOutputStream.writeUTF() + int strlen = str.length(); + int utflen = 0; + int c; + + /* use charAt instead of copying String to Char array */ + for (int i = 0; i < strlen; i++) { + c = str.charAt(i); + if ((c >= 0x0001) & (c <= 0x007F)) { + utflen++; + } else if (c > 0x07FF) { + utflen += 3; + } else { + utflen += 2; + } + if (utflen > utfLimit) { + return i; + } + } + return strlen; + } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/376af83a/nifi-commons/nifi-schema-utils/src/test/java/org/apache/nifi/repository/schema/TestSchemaRecordReaderWriter.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-schema-utils/src/test/java/org/apache/nifi/repository/schema/TestSchemaRecordReaderWriter.java b/nifi-commons/nifi-schema-utils/src/test/java/org/apache/nifi/repository/schema/TestSchemaRecordReaderWriter.java index 18548fb..5eb815a 100644 --- a/nifi-commons/nifi-schema-utils/src/test/java/org/apache/nifi/repository/schema/TestSchemaRecordReaderWriter.java +++ b/nifi-commons/nifi-schema-utils/src/test/java/org/apache/nifi/repository/schema/TestSchemaRecordReaderWriter.java @@ -17,6 +17,7 @@ package org.apache.nifi.repository.schema; +import static org.apache.nifi.repository.schema.SchemaRecordWriter.MAX_ALLOWED_UTF_LENGTH; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -26,6 +27,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -33,10 +35,18 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import org.apache.commons.lang3.StringUtils; import org.junit.Test; public class TestSchemaRecordReaderWriter { + private static Character utfCharOneByte = '$'; + private static Character utfCharTwoByte = '¢'; + private static Character utfCharThreeByte = 'â¬'; + private static String utfStringOneByte = utfCharOneByte.toString(); + private static String utfStringTwoByte = utfCharTwoByte.toString(); + private static String utfStringThreeByte = utfCharThreeByte.toString(); + @Test @SuppressWarnings("unchecked") public void testRoundTrip() throws IOException { @@ -172,6 +182,126 @@ public class TestSchemaRecordReaderWriter { } } + @Test + @SuppressWarnings("unchecked") + public void testUTFLargerThan64k() throws IOException { + // Create a Record Schema + final List<RecordField> fields = new ArrayList<>(); + fields.add(new SimpleRecordField("int present", FieldType.INT, Repetition.ZERO_OR_ONE)); + fields.add(new SimpleRecordField("string present", FieldType.STRING, Repetition.ZERO_OR_ONE)); + + final RecordSchema schema = new RecordSchema(fields); + + // Create a Map of record fields to values, so that we can create a Record to write out + final Map<RecordField, Object> values = new LinkedHashMap<>(); + values.put(createField("int present", FieldType.INT), 42); + final String utfString = utfStringOneByte + utfStringTwoByte + utfStringThreeByte; // 3 chars and 6 utf8 bytes + final String seventyK = StringUtils.repeat(utfString, 21845); // 65,535 chars and 131070 utf8 bytes + assertTrue(seventyK.length() == 65535); + assertTrue(seventyK.getBytes("UTF-8").length == 131070); + values.put(createField("string present", FieldType.STRING), seventyK); + + final FieldMapRecord originalRecord = new FieldMapRecord(values, schema); + + // Write out a record and read it back in. + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + // Write the schema to the stream + schema.writeTo(baos); + + // Write the record twice, to make sure that we're able to read/write multiple sequential records + final SchemaRecordWriter writer = new SchemaRecordWriter(); + writer.writeRecord(originalRecord, baos); + writer.writeRecord(originalRecord, baos); + + try (final InputStream in = new ByteArrayInputStream(baos.toByteArray())) { + // Read the Schema from the stream and create a Record Reader for reading records, based on this schema + final RecordSchema readSchema = RecordSchema.readFrom(in); + final SchemaRecordReader reader = SchemaRecordReader.fromSchema(readSchema); + + // Read the records and verify the values. + for (int i=0; i < 2; i++) { + final Record record = reader.readRecord(in); + + assertNotNull(record); + assertEquals(42, record.getFieldValue("int present")); + assertTrue(MAX_ALLOWED_UTF_LENGTH - ((String)record.getFieldValue("string present")).getBytes("utf-8").length <= 3); + assertEquals(32768, ((String)record.getFieldValue("string present")).length()); + } + + // Ensure that there is no more data. + assertNull(reader.readRecord(in)); + } + } + } + + @Test + public void testSingleCharUTFLengths() { + // verify handling of single characters mapping to 1, 2, and 3 utf byte strings + assertEquals("test 1 char string truncated to 0 utf bytes should be 0", 0, SchemaRecordWriter.getCharsInUTFLength(utfStringOneByte, 0)); + assertEquals("test 2 char string truncated to 0 utf bytes should be 0", 0, SchemaRecordWriter.getCharsInUTFLength(utfStringTwoByte, 0)); + assertEquals("test 3 char string truncated to 0 utf bytes should be 0", 0, SchemaRecordWriter.getCharsInUTFLength(utfStringThreeByte, 0)); + assertEquals("test 1 char string truncated to 1 utf bytes should be 1", 1, SchemaRecordWriter.getCharsInUTFLength(utfStringOneByte, 1)); + assertEquals("test 2 char string truncated to 1 utf bytes should be 0", 0, SchemaRecordWriter.getCharsInUTFLength(utfStringTwoByte, 1)); + assertEquals("test 3 char string truncated to 1 utf bytes should be 0", 0, SchemaRecordWriter.getCharsInUTFLength(utfStringThreeByte, 1)); + assertEquals("test 1 char string truncated to 2 utf bytes should be 1", 1, SchemaRecordWriter.getCharsInUTFLength(utfStringOneByte, 2)); + assertEquals("test 2 char string truncated to 2 utf bytes should be 2", 1, SchemaRecordWriter.getCharsInUTFLength(utfStringTwoByte, 2)); + assertEquals("test 3 char string truncated to 2 utf bytes should be 0", 0, SchemaRecordWriter.getCharsInUTFLength(utfStringThreeByte, 2)); + assertEquals("test 1 char string truncated to 3 utf bytes should be 1", 1, SchemaRecordWriter.getCharsInUTFLength(utfStringOneByte, 3)); + assertEquals("test 2 char string truncated to 3 utf bytes should be 2", 1, SchemaRecordWriter.getCharsInUTFLength(utfStringTwoByte, 3)); + assertEquals("test 3 char string truncated to 3 utf bytes should be 3", 1, SchemaRecordWriter.getCharsInUTFLength(utfStringThreeByte, 3)); + } + + @Test + public void testMultiCharUTFLengths() { + // test boundary conditions as 1, 2, and 3 UTF byte chars are included into utf limit positions used by strings + final String testString1 = utfStringOneByte + utfStringTwoByte + utfStringThreeByte; // char 'abc' utf 'abbccc' + assertEquals("test 6 char string truncated to 0 utf bytes should be 0", 0, SchemaRecordWriter.getCharsInUTFLength(testString1, 0)); // utf '' + assertEquals("test 6 char string truncated to 1 utf bytes should be 1", 1, SchemaRecordWriter.getCharsInUTFLength(testString1, 1)); // utf 'a' + assertEquals("test 6 char string truncated to 2 utf bytes should be 1", 1, SchemaRecordWriter.getCharsInUTFLength(testString1, 2)); // utf 'a' + assertEquals("test 6 char string truncated to 3 utf bytes should be 2", 2, SchemaRecordWriter.getCharsInUTFLength(testString1, 3)); // utf 'abb' + assertEquals("test 6 char string truncated to 4 utf bytes should be 2", 2, SchemaRecordWriter.getCharsInUTFLength(testString1, 4)); // utf 'abb' + assertEquals("test 6 char string truncated to 5 utf bytes should be 2", 2, SchemaRecordWriter.getCharsInUTFLength(testString1, 5)); // utf 'abb' + assertEquals("test 6 char string truncated to 6 utf bytes should be 3", 3, SchemaRecordWriter.getCharsInUTFLength(testString1, 6)); // utf 'abbccc' + } + + @Test + public void testSmallCharUTFLengths() throws UnsupportedEncodingException { + final String string12b = StringUtils.repeat(utfStringOneByte + utfStringTwoByte + utfStringThreeByte, 2); + + assertEquals("test multi-char string truncated to 0 utf bytes should be 0", 0, SchemaRecordWriter.getCharsInUTFLength(string12b, 0)); + assertEquals("test multi-char string truncated to 1 utf bytes should be 0", 1, SchemaRecordWriter.getCharsInUTFLength(string12b, 1)); + assertEquals("test multi-char string truncated to 2 utf bytes should be 0", 1, SchemaRecordWriter.getCharsInUTFLength(string12b, 2)); + assertEquals("test multi-char string truncated to 3 utf bytes should be 0", 2, SchemaRecordWriter.getCharsInUTFLength(string12b, 3)); + assertEquals("test multi-char string truncated to 4 utf bytes should be 0", 2, SchemaRecordWriter.getCharsInUTFLength(string12b, 4)); + assertEquals("test multi-char string truncated to 5 utf bytes should be 0", 2, SchemaRecordWriter.getCharsInUTFLength(string12b, 5)); + assertEquals("test multi-char string truncated to 6 utf bytes should be 0", 3, SchemaRecordWriter.getCharsInUTFLength(string12b, 6)); + assertEquals("test multi-char string truncated to 7 utf bytes should be 0", 4, SchemaRecordWriter.getCharsInUTFLength(string12b, 7)); + assertEquals("test multi-char string truncated to 8 utf bytes should be 0", 4, SchemaRecordWriter.getCharsInUTFLength(string12b, 8)); + assertEquals("test multi-char string truncated to 9 utf bytes should be 0", 5, SchemaRecordWriter.getCharsInUTFLength(string12b, 9)); + assertEquals("test multi-char string truncated to 10 utf bytes should be 0", 5, SchemaRecordWriter.getCharsInUTFLength(string12b, 10)); + assertEquals("test multi-char string truncated to 11 utf bytes should be 0", 5, SchemaRecordWriter.getCharsInUTFLength(string12b, 11)); + assertEquals("test multi-char string truncated to 12 utf bytes should be 0", 6, SchemaRecordWriter.getCharsInUTFLength(string12b, 12)); + } + + @Test + public void testLargeCharUTFLengths() { + final String string64k = StringUtils.repeat(utfStringOneByte + utfStringTwoByte + utfStringThreeByte, 21845); + + assertEquals("test 64k char string should be 64k chars long", 65535, string64k.length()); + + // drop half the chars going to utf of 64k bytes -- (1+1+1) * 21845 = 65535 chars which converts to (1+2+3) * 21845 = 131070 utf bytes so 1/2 is truncated + assertEquals("test 64k char string truncated to 65,535 utf bytes should be 32768", 32768, SchemaRecordWriter.getCharsInUTFLength(string64k, 65535)); + + // dropping bytes off the end of utf length + assertEquals("test 64k char string truncated to 65,534 utf bytes should be 32767", 32767, SchemaRecordWriter.getCharsInUTFLength(string64k, 65534)); // lost 2 byte char + assertEquals("test 64k char string truncated to 65,533 utf bytes should be 32767", 32767, SchemaRecordWriter.getCharsInUTFLength(string64k, 65533)); + assertEquals("test 64k char string truncated to 65,532 utf bytes should be 32766", 32766, SchemaRecordWriter.getCharsInUTFLength(string64k, 65532)); // lost 1 byte char + assertEquals("test 64k char string truncated to 65,531 utf bytes should be 32765", 32765, SchemaRecordWriter.getCharsInUTFLength(string64k, 65531)); // lost 3 byte char + assertEquals("test 64k char string truncated to 65,530 utf bytes should be 32765", 32765, SchemaRecordWriter.getCharsInUTFLength(string64k, 65530)); + assertEquals("test 64k char string truncated to 65,529 utf bytes should be 32765", 32765, SchemaRecordWriter.getCharsInUTFLength(string64k, 65529)); + assertEquals("test 64k char string truncated to 65,528 utf bytes should be 32764", 32764, SchemaRecordWriter.getCharsInUTFLength(string64k, 65528)); // lost 2 byte char (again) + } + private SimpleRecordField createField(final String fieldName, final FieldType type) { return new SimpleRecordField(fieldName, type, Repetition.ZERO_OR_ONE); } http://git-wip-us.apache.org/repos/asf/nifi/blob/376af83a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml index a56296f..4db4169 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml @@ -58,5 +58,10 @@ <groupId>org.apache.lucene</groupId> <artifactId>lucene-queryparser</artifactId> </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/nifi/blob/376af83a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java index a95bd4f..076e507 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java @@ -19,6 +19,7 @@ package org.apache.nifi.provenance; import java.io.File; import java.io.IOException; import java.io.OutputStream; +import java.io.UTFDataFormatException; import java.util.Collection; import java.util.Map; @@ -34,6 +35,9 @@ import org.slf4j.LoggerFactory; */ @Deprecated public class StandardRecordWriter extends CompressableRecordWriter implements RecordWriter { + + public static final int MAX_ALLOWED_UTF_LENGTH = 65_535; + private static final Logger logger = LoggerFactory.getLogger(StandardRecordWriter.class); public static final int SERIALIZATION_VERISON = 9; public static final String SERIALIZATION_NAME = "org.apache.nifi.provenance.PersistentProvenanceRepository"; @@ -72,7 +76,7 @@ public class StandardRecordWriter extends CompressableRecordWriter implements Re final ProvenanceEventType recordType = record.getEventType(); out.writeLong(recordIdentifier); - out.writeUTF(record.getEventType().name()); + writeUTFLimited(out, record.getEventType().name()); out.writeLong(record.getEventTime()); out.writeLong(record.getFlowFileEntryDate()); out.writeLong(record.getEventDuration()); @@ -101,9 +105,9 @@ public class StandardRecordWriter extends CompressableRecordWriter implements Re // If Content Claim Info is present, write out a 'TRUE' followed by claim info. Else, write out 'false'. if (record.getContentClaimSection() != null && record.getContentClaimContainer() != null && record.getContentClaimIdentifier() != null) { out.writeBoolean(true); - out.writeUTF(record.getContentClaimContainer()); - out.writeUTF(record.getContentClaimSection()); - out.writeUTF(record.getContentClaimIdentifier()); + writeUTFLimited(out, record.getContentClaimContainer()); + writeUTFLimited(out, record.getContentClaimSection()); + writeUTFLimited(out, record.getContentClaimIdentifier()); if (record.getContentClaimOffset() == null) { out.writeLong(0L); } else { @@ -117,9 +121,9 @@ public class StandardRecordWriter extends CompressableRecordWriter implements Re // If Previous Content Claim Info is present, write out a 'TRUE' followed by claim info. Else, write out 'false'. if (record.getPreviousContentClaimSection() != null && record.getPreviousContentClaimContainer() != null && record.getPreviousContentClaimIdentifier() != null) { out.writeBoolean(true); - out.writeUTF(record.getPreviousContentClaimContainer()); - out.writeUTF(record.getPreviousContentClaimSection()); - out.writeUTF(record.getPreviousContentClaimIdentifier()); + writeUTFLimited(out, record.getPreviousContentClaimContainer()); + writeUTFLimited(out, record.getPreviousContentClaimSection()); + writeUTFLimited(out, record.getPreviousContentClaimIdentifier()); if (record.getPreviousContentClaimOffset() == null) { out.writeLong(0L); } else { @@ -157,7 +161,7 @@ public class StandardRecordWriter extends CompressableRecordWriter implements Re } protected void writeUUID(final DataOutputStream out, final String uuid) throws IOException { - out.writeUTF(uuid); + writeUTFLimited(out, uuid); } protected void writeUUIDs(final DataOutputStream out, final Collection<String> list) throws IOException { @@ -176,7 +180,7 @@ public class StandardRecordWriter extends CompressableRecordWriter implements Re out.writeBoolean(false); } else { out.writeBoolean(true); - out.writeUTF(toWrite); + writeUTFLimited(out, toWrite); } } @@ -195,6 +199,45 @@ public class StandardRecordWriter extends CompressableRecordWriter implements Re out.write(bytes); } + private void writeUTFLimited(final java.io.DataOutputStream out, final String utfString) throws IOException { + try { + out.writeUTF(utfString); + } catch (UTFDataFormatException e) { + final String truncated = utfString.substring(0, getCharsInUTFLength(utfString, MAX_ALLOWED_UTF_LENGTH)); + logger.warn("Truncating repository record value! Attempted to write {} chars that encode to a UTF byte length greater than " + + "supported maximum ({}), truncating to {} chars.", + utfString.length(), MAX_ALLOWED_UTF_LENGTH, truncated.length()); + if (logger.isDebugEnabled()) { + logger.warn("String value was:\n{}", truncated); + } + out.writeUTF(truncated); + } + } + + static int getCharsInUTFLength(final String str, final int utfLimit) { + // see java.io.DataOutputStream.writeUTF() + int strlen = str.length(); + int utflen = 0; + int c; + + /* use charAt instead of copying String to Char array */ + for (int i = 0; i < strlen; i++) { + c = str.charAt(i); + if ((c >= 0x0001) & (c <= 0x007F)) { + utflen++; + } else if (c > 0x07FF) { + utflen += 3; + } else { + utflen += 2; + } + if (utflen > utfLimit) { + return i; + } + } + return strlen; + } + + @Override public String toString() { http://git-wip-us.apache.org/repos/asf/nifi/blob/376af83a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java index cc69b18..dfa37e4 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java @@ -21,8 +21,12 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.provenance.serialization.RecordReader; import org.apache.nifi.provenance.serialization.RecordWriter; import org.apache.nifi.provenance.toc.NopTocWriter; @@ -33,6 +37,9 @@ import org.apache.nifi.stream.io.NullOutputStream; import org.junit.Ignore; import org.junit.Test; +import static org.apache.nifi.provenance.TestUtil.createFlowFile; +import static org.junit.Assert.assertTrue; + public class TestStandardRecordReaderWriter extends AbstractTestRecordReaderWriter { @@ -108,6 +115,42 @@ public class TestStandardRecordReaderWriter extends AbstractTestRecordReaderWrit System.out.println("Took " + millis + " millis to read " + numEvents + " events"); } + @Test + public void testWriteUtfLargerThan64k() throws IOException, InterruptedException { + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("filename", "1.txt"); + attributes.put("uuid", UUID.randomUUID().toString()); + + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventTime(System.currentTimeMillis()); + builder.setEventType(ProvenanceEventType.RECEIVE); + builder.setTransitUri("nifi://unit-test"); + builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); + builder.setComponentId("1234"); + builder.setComponentType("dummy processor"); + final String seventyK = StringUtils.repeat("X", 70000); + assertTrue(seventyK.length() > 65535); + assertTrue(seventyK.getBytes("UTF-8").length > 65535); + builder.setDetails(seventyK); + final ProvenanceEventRecord record = builder.build(); + + try (final ByteArrayOutputStream headerOut = new ByteArrayOutputStream(); + final DataOutputStream out = new DataOutputStream(headerOut)) { + out.writeUTF(PersistentProvenanceRepository.class.getName()); + out.writeInt(9); + } + + try (final ByteArrayOutputStream recordOut = new ByteArrayOutputStream(); + final StandardRecordWriter writer = new StandardRecordWriter(recordOut, null, false, 0)) { + + writer.writeHeader(1L); + recordOut.reset(); + + writer.writeRecord(record, 1L); + } + } + @Override protected RecordWriter createWriter(File file, TocWriter tocWriter, boolean compressed, int uncompressedBlockSize) throws IOException { return new StandardRecordWriter(file, tocWriter, compressed, uncompressedBlockSize);
