This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 452ca98  NIFI-6924: When seeking to the appropriate offset for a 
content claim, ensure that if there are not enough bytes in the underlying 
resource claim that a ContentNotFoundException is thrown. Also cleaned up 
error-handling case in StandardProcessSession to ensure that we close the 
existing InputStream before calling handleContenttNotFoundException, since this 
method may itself throw an Exception
452ca98 is described below

commit 452ca98c29eeda6a89c60350e4577f82138d7ef0
Author: Mark Payne <[email protected]>
AuthorDate: Tue Dec 10 11:20:01 2019 -0500

    NIFI-6924: When seeking to the appropriate offset for a content claim, 
ensure that if there are not enough bytes in the underlying resource claim that 
a ContentNotFoundException is thrown. Also cleaned up error-handling case in 
StandardProcessSession to ensure that we close the existing InputStream before 
calling handleContenttNotFoundException, since this method may itself throw an 
Exception
    
    This closes #3924.
    
    Signed-off-by: Bryan Bende <[email protected]>
---
 .../repository/FileSystemRepository.java           | 51 +++++++++++++---------
 .../repository/StandardProcessSession.java         | 18 +++++---
 .../repository/TestFileSystemRepository.java       | 48 ++++++++++++++++++++
 3 files changed, 92 insertions(+), 25 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index 8eee57d..d6e5f35 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -16,7 +16,27 @@
  */
 package org.apache.nifi.controller.repository;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.controller.repository.io.LimitedInputStream;
+import org.apache.nifi.engine.FlowEngine;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.stream.io.ByteCountingOutputStream;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.stream.io.SynchronizedByteCountingOutputStream;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.file.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.ByteArrayInputStream;
+import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
@@ -57,24 +77,6 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.controller.repository.claim.StandardContentClaim;
-import org.apache.nifi.controller.repository.io.LimitedInputStream;
-import org.apache.nifi.engine.FlowEngine;
-import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.stream.io.ByteCountingOutputStream;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.stream.io.SynchronizedByteCountingOutputStream;
-import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.util.StopWatch;
-import org.apache.nifi.util.file.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Is thread safe
@@ -882,11 +884,20 @@ public class FileSystemRepository implements 
ContentRepository {
         if (claim.getOffset() > 0L) {
             try {
                 StreamUtils.skip(fis, claim.getOffset());
-            } catch (IOException ioe) {
+            } catch (final EOFException eof) {
+                final long resourceClaimBytes;
+                try {
+                    resourceClaimBytes = Files.size(path);
+                } catch (final IOException e) {
+                    throw new ContentNotFoundException(claim, "Content Claim 
has an offset of " + claim.getOffset()
+                        + " but Resource Claim has fewer than this many bytes 
(actual length of the resource claim could not be determined)");
+                }
+
+                throw new ContentNotFoundException(claim, "Content Claim has 
an offset of " + claim.getOffset() + " but Resource Claim " + path + " is only 
" + resourceClaimBytes + " bytes");
+            } catch (final IOException ioe) {
                 IOUtils.closeQuietly(fis);
                 throw ioe;
             }
-
         }
 
         // A claim length of -1 indicates that the claim is still being 
written to and we don't know
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index f7249c0..c758c3d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -56,6 +56,7 @@ import 
org.apache.nifi.provenance.StandardProvenanceEventRecord;
 import org.apache.nifi.stream.io.ByteCountingInputStream;
 import org.apache.nifi.stream.io.ByteCountingOutputStream;
 import org.apache.nifi.stream.io.StreamUtils;
+import org.rocksdb.Checkpoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -2361,7 +2362,14 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             throw new FlowFileAccessException("Failed to access ContentClaim 
for " + source.toString(), e);
         }
 
-        final InputStream rawIn = getInputStream(source, 
record.getCurrentClaim(), record.getCurrentClaimOffset(), true);
+        final InputStream rawIn;
+        try {
+            rawIn = getInputStream(source, record.getCurrentClaim(), 
record.getCurrentClaimOffset(), true);
+        } catch (final ContentNotFoundException nfe) {
+            handleContentNotFound(nfe, record);
+            throw nfe;
+        }
+
         final InputStream limitedIn = new LimitedInputStream(rawIn, 
source.getSize());
         final ByteCountingInputStream countingStream = new 
ByteCountingInputStream(limitedIn);
         final FlowFileAccessInputStream ffais = new 
FlowFileAccessInputStream(countingStream, source, record.getCurrentClaim());
@@ -2375,13 +2383,13 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                 try {
                     return ffais.read();
                 } catch (final ContentNotFoundException cnfe) {
-                    handleContentNotFound(cnfe, record);
                     close();
+                    handleContentNotFound(cnfe, record);
                     throw cnfe;
                 } catch (final FlowFileAccessException ffae) {
                     LOG.error("Failed to read content from " + sourceFlowFile 
+ "; rolling back session", ffae);
-                    rollback(true);
                     close();
+                    rollback(true);
                     throw ffae;
                 }
             }
@@ -2396,13 +2404,13 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                 try {
                     return ffais.read(b, off, len);
                 } catch (final ContentNotFoundException cnfe) {
-                    handleContentNotFound(cnfe, record);
                     close();
+                    handleContentNotFound(cnfe, record);
                     throw cnfe;
                 } catch (final FlowFileAccessException ffae) {
                     LOG.error("Failed to read content from " + sourceFlowFile 
+ "; rolling back session", ffae);
-                    rollback(true);
                     close();
+                    rollback(true);
                     throw ffae;
                 }
             }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
index bf1a579..6ac10d5 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
@@ -21,6 +21,7 @@ import ch.qos.logback.classic.Logger;
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.read.ListAppender;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.StandardContentClaim;
 import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
 import 
org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
@@ -29,6 +30,7 @@ import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -37,10 +39,12 @@ import org.slf4j.LoggerFactory;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
@@ -165,6 +169,50 @@ public class TestFileSystemRepository {
     }
 
     @Test
+    public void testContentNotFoundExceptionThrownIfResourceClaimTooShort() 
throws IOException {
+        final File contentFile = new File("target/content_repository/0/0.bin");
+        try (final OutputStream fos = new FileOutputStream(contentFile)) {
+            fos.write("Hello World".getBytes(StandardCharsets.UTF_8));
+        }
+
+        final ResourceClaim resourceClaim = new 
StandardResourceClaim(claimManager, "default", "0", "0.bin", false);
+        final StandardContentClaim existingContentClaim = new 
StandardContentClaim(resourceClaim, 0);
+        existingContentClaim.setLength(11);
+
+        try (final InputStream in = repository.read(existingContentClaim)) {
+            final byte[] buff = new byte[11];
+            StreamUtils.fillBuffer(in, buff);
+            assertEquals("Hello World", new String(buff, 
StandardCharsets.UTF_8));
+        }
+
+        final StandardContentClaim halfContentClaim = new 
StandardContentClaim(resourceClaim, 6);
+        halfContentClaim.setLength(5);
+
+        try (final InputStream in = repository.read(halfContentClaim)) {
+            final byte[] buff = new byte[5];
+            StreamUtils.fillBuffer(in, buff);
+            assertEquals("World", new String(buff, StandardCharsets.UTF_8));
+        }
+
+        final StandardContentClaim emptyContentClaim = new 
StandardContentClaim(resourceClaim, 11);
+        existingContentClaim.setLength(0);
+
+        try (final InputStream in = repository.read(emptyContentClaim)) {
+            assertEquals(-1, in.read());
+        }
+
+        final StandardContentClaim missingContentClaim = new 
StandardContentClaim(resourceClaim, 12);
+        missingContentClaim.setLength(1);
+
+        try {
+            repository.read(missingContentClaim);
+            Assert.fail("Did not throw ContentNotFoundException");
+        } catch (final ContentNotFoundException cnfe) {
+            // Expected
+        }
+    }
+
+    @Test
     public void testBogusFile() throws IOException {
         repository.shutdown();
         System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, 
TestFileSystemRepository.class.getResource("/conf/nifi.properties").getFile());

Reply via email to