This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new cf41c10  NIFI-5879: Fixed bug in FileSystemRepository that can occur 
if an InputStream is obtained, then more data is written to the Content Claim - 
the InputStream would end before allowing the sequential data to be read. Also 
fixed bugs in LimitedInputStream related to available(), mark(), and reset() 
and the corresponding unit tests. Additionally, found that one call to 
StandardProcessSession.read() was not properly flushing the output of any 
Content Claim that has been writte [...]
cf41c10 is described below

commit cf41c10546d940aa86d0287bbeb2cdaf4a6c8a2a
Author: Mark Payne <[email protected]>
AuthorDate: Thu Dec 6 16:22:29 2018 -0500

    NIFI-5879: Fixed bug in FileSystemRepository that can occur if an 
InputStream is obtained, then more data is written to the Content Claim - the 
InputStream would end before allowing the sequential data to be read. Also 
fixed bugs in LimitedInputStream related to available(), mark(), and reset() 
and the corresponding unit tests. Additionally, found that one call to 
StandardProcessSession.read() was not properly flushing the output of any 
Content Claim that has been written to before at [...]
    
    Signed-off-by: Matthew Burgess <[email protected]>
    
    This closes #3207
---
 .../repository/FileSystemRepository.java           | 11 +++-
 .../repository/StandardProcessSession.java         |  4 +-
 .../repository/io/LimitedInputStream.java          | 34 ++++++++---
 .../repository/TestFileSystemRepository.java       | 69 ++++++++++++++--------
 .../repository/io/TestLimitedInputStream.java      | 17 +++---
 5 files changed, 94 insertions(+), 41 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index c041f5c..125cd50 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -864,9 +864,16 @@ public class FileSystemRepository implements 
ContentRepository {
 
         }
 
-        // see javadocs for claim.getLength() as to why we do this.
+        // A claim length of -1 indicates that the claim is still being 
written to and we don't know
+        // the length. In this case, we don't limit the Input Stream. If the 
Length has been populated, though,
+        // it is possible that the Length could then be extended. However, we 
do want to avoid ever allowing the
+        // stream to read past the end of the Content Claim. To accomplish 
this, we use a LimitedInputStream but
+        // provide a LongSupplier for the length instead of a Long value. this 
allows us to continue reading until
+        // we get to the end of the Claim, even if the Claim grows. This may 
happen, for instance, if we obtain an
+        // InputStream for this claim, then read from it, write more to the 
claim, and then attempt to read again. In
+        // such a case, since we have written to that same Claim, we should 
still be able to read those bytes.
         if (claim.getLength() >= 0) {
-            return new LimitedInputStream(fis, claim.getLength());
+            return new LimitedInputStream(fis, claim::getLength);
         } else {
             return fis;
         }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 4354dc4..cc3ac19 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -2267,7 +2267,9 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         final StandardRepositoryRecord record = getRecord(source);
 
         try {
-            ensureNotAppending(record.getCurrentClaim());
+            final ContentClaim currentClaim = record.getCurrentClaim();
+            ensureNotAppending(currentClaim);
+            claimCache.flush(currentClaim);
         } catch (final IOException e) {
             throw new FlowFileAccessException("Failed to access ContentClaim 
for " + source.toString(), e);
         }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/LimitedInputStream.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/LimitedInputStream.java
index 74597ae..7c32cc8 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/LimitedInputStream.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/LimitedInputStream.java
@@ -18,21 +18,36 @@ package org.apache.nifi.controller.repository.io;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Objects;
+import java.util.function.LongSupplier;
 
 public class LimitedInputStream extends InputStream {
 
     private final InputStream in;
-    private long limit;
+    private final long limit;
+    private final LongSupplier limitSupplier;
     private long bytesRead = 0;
+    private long markOffset = -1L;
+
+    public LimitedInputStream(final InputStream in, final LongSupplier 
limitSupplier) {
+        this.in = in;
+        this.limitSupplier = Objects.requireNonNull(limitSupplier);
+        this.limit = -1;
+    }
 
     public LimitedInputStream(final InputStream in, final long limit) {
         this.in = in;
         this.limit = limit;
+        this.limitSupplier = null;
+    }
+
+    private long getLimit() {
+        return limitSupplier == null ? limit : limitSupplier.getAsLong();
     }
 
     @Override
     public int read() throws IOException {
-        if (bytesRead >= limit) {
+        if (bytesRead >= getLimit()) {
             return -1;
         }
 
@@ -45,6 +60,7 @@ public class LimitedInputStream extends InputStream {
 
     @Override
     public int read(final byte[] b) throws IOException {
+        final long limit = getLimit();
         if (bytesRead >= limit) {
             return -1;
         }
@@ -60,6 +76,7 @@ public class LimitedInputStream extends InputStream {
 
     @Override
     public int read(byte[] b, int off, int len) throws IOException {
+        final long limit = getLimit();
         if (bytesRead >= limit) {
             return -1;
         }
@@ -75,14 +92,14 @@ public class LimitedInputStream extends InputStream {
 
     @Override
     public long skip(final long n) throws IOException {
-        final long skipped = in.skip(Math.min(n, limit - bytesRead));
+        final long skipped = in.skip(Math.min(n, getLimit() - bytesRead));
         bytesRead += skipped;
         return skipped;
     }
 
     @Override
     public int available() throws IOException {
-        return (int)(limit - bytesRead);
+        return (int)(getLimit() - bytesRead);
     }
 
     @Override
@@ -93,8 +110,7 @@ public class LimitedInputStream extends InputStream {
     @Override
     public void mark(int readlimit) {
         in.mark(readlimit);
-        limit -= bytesRead;
-        bytesRead = 0;
+        markOffset = bytesRead;
     }
 
     @Override
@@ -105,6 +121,10 @@ public class LimitedInputStream extends InputStream {
     @Override
     public void reset() throws IOException {
         in.reset();
-        bytesRead = 0;
+
+        if (markOffset >= 0) {
+            bytesRead = markOffset;
+        }
+        markOffset = -1;
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
index 3ecff71..bf1a579 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
@@ -16,12 +16,23 @@
  */
 package org.apache.nifi.controller.repository;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertTrue;
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.Logger;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.read.ListAppender;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
+import 
org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
+import org.apache.nifi.controller.repository.util.DiskUtils;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -45,24 +56,12 @@ import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.StandardContentClaim;
-import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
-import 
org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
-import org.apache.nifi.controller.repository.util.DiskUtils;
-import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.util.NiFiProperties;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.LoggerFactory;
-
-import ch.qos.logback.classic.Level;
-import ch.qos.logback.classic.Logger;
-import ch.qos.logback.classic.spi.ILoggingEvent;
-import ch.qos.logback.core.read.ListAppender;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeFalse;
 
 public class TestFileSystemRepository {
@@ -192,6 +191,28 @@ public class TestFileSystemRepository {
     }
 
     @Test
+    public void testReadClaimThenWriteThenReadMore() throws IOException {
+        final ContentClaim claim = repository.create(false);
+
+        final OutputStream out = repository.write(claim);
+        out.write("hello".getBytes());
+        out.flush();
+
+        final InputStream in = repository.read(claim);
+        final byte[] buffer = new byte[5];
+        StreamUtils.fillBuffer(in, buffer);
+
+        assertEquals("hello", new String(buffer));
+
+        out.write("good-bye".getBytes());
+        out.close();
+
+        final byte[] buffer2 = new byte[8];
+        StreamUtils.fillBuffer(in, buffer2);
+        assertEquals("good-bye", new String(buffer2));
+    }
+
+    @Test
     public void testClaimantCounts() throws IOException {
         final ContentClaim claim = repository.create(true);
         assertNotNull(claim);
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/io/TestLimitedInputStream.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/io/TestLimitedInputStream.java
index 7b1e64d..129fed6 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/io/TestLimitedInputStream.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/io/TestLimitedInputStream.java
@@ -16,14 +16,14 @@
  */
 package org.apache.nifi.controller.repository.io;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.fail;
+import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 public class TestLimitedInputStream {
 
@@ -70,9 +70,11 @@ public class TestLimitedInputStream {
     @Test
     public void testSkip() throws Exception {
         final LimitedInputStream lis = new LimitedInputStream(bais, 4);
+        lis.mark(4);
         assertEquals(3, lis.read(buffer3));
         assertEquals(1, lis.skip(data.length));
         lis.reset();
+        lis.mark(4);
         assertEquals(4, lis.skip(7));
         lis.reset();
         assertEquals(2, lis.skip(2));
@@ -91,7 +93,7 @@ public class TestLimitedInputStream {
     @Test
     public void testAvailable() throws Exception {
         final LimitedInputStream lis = new LimitedInputStream(bais, 4);
-        assertNotEquals(data.length, lis.available());
+        assertEquals(4, lis.available());
         lis.reset();
         assertEquals(4, lis.available());
         assertEquals(1, lis.read(buffer3, 0, 1));
@@ -107,14 +109,15 @@ public class TestLimitedInputStream {
     @Test
     public void testMark() throws Exception {
         final LimitedInputStream lis = new LimitedInputStream(bais, 6);
+        lis.mark(1000);
         assertEquals(3, lis.read(buffer3));
         assertEquals(3, lis.read(buffer10));
         lis.reset();
-        assertEquals(3, lis.read(buffer3));
         lis.mark(1000);
+        assertEquals(3, lis.read(buffer3));
         assertEquals(3, lis.read(buffer10));
         lis.reset();
-        assertEquals(3, lis.read(buffer10));
+        assertEquals(6, lis.read(buffer10));
     }
 
 }

Reply via email to