This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 452ca98 NIFI-6924: When seeking to the appropriate offset for a
content claim, ensure that if there are not enough bytes in the underlying
resource claim that a ContentNotFoundException is thrown. Also cleaned up
error-handling case in StandardProcessSession to ensure that we close the
existing InputStream before calling handleContenttNotFoundException, since this
method may itself throw an Exception
452ca98 is described below
commit 452ca98c29eeda6a89c60350e4577f82138d7ef0
Author: Mark Payne <[email protected]>
AuthorDate: Tue Dec 10 11:20:01 2019 -0500
NIFI-6924: When seeking to the appropriate offset for a content claim,
ensure that if there are not enough bytes in the underlying resource claim that
a ContentNotFoundException is thrown. Also cleaned up error-handling case in
StandardProcessSession to ensure that we close the existing InputStream before
calling handleContenttNotFoundException, since this method may itself throw an
Exception
This closes #3924.
Signed-off-by: Bryan Bende <[email protected]>
---
.../repository/FileSystemRepository.java | 51 +++++++++++++---------
.../repository/StandardProcessSession.java | 18 +++++---
.../repository/TestFileSystemRepository.java | 48 ++++++++++++++++++++
3 files changed, 92 insertions(+), 25 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index 8eee57d..d6e5f35 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -16,7 +16,27 @@
*/
package org.apache.nifi.controller.repository;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.controller.repository.io.LimitedInputStream;
+import org.apache.nifi.engine.FlowEngine;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.stream.io.ByteCountingOutputStream;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.stream.io.SynchronizedByteCountingOutputStream;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.file.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.ByteArrayInputStream;
+import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
@@ -57,24 +77,6 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.controller.repository.claim.StandardContentClaim;
-import org.apache.nifi.controller.repository.io.LimitedInputStream;
-import org.apache.nifi.engine.FlowEngine;
-import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.stream.io.ByteCountingOutputStream;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.stream.io.SynchronizedByteCountingOutputStream;
-import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.util.StopWatch;
-import org.apache.nifi.util.file.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Is thread safe
@@ -882,11 +884,20 @@ public class FileSystemRepository implements
ContentRepository {
if (claim.getOffset() > 0L) {
try {
StreamUtils.skip(fis, claim.getOffset());
- } catch (IOException ioe) {
+ } catch (final EOFException eof) {
+ final long resourceClaimBytes;
+ try {
+ resourceClaimBytes = Files.size(path);
+ } catch (final IOException e) {
+ throw new ContentNotFoundException(claim, "Content Claim
has an offset of " + claim.getOffset()
+ + " but Resource Claim has fewer than this many bytes
(actual length of the resource claim could not be determined)");
+ }
+
+ throw new ContentNotFoundException(claim, "Content Claim has
an offset of " + claim.getOffset() + " but Resource Claim " + path + " is only
" + resourceClaimBytes + " bytes");
+ } catch (final IOException ioe) {
IOUtils.closeQuietly(fis);
throw ioe;
}
-
}
// A claim length of -1 indicates that the claim is still being
written to and we don't know
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index f7249c0..c758c3d 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -56,6 +56,7 @@ import
org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.apache.nifi.stream.io.ByteCountingOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
+import org.rocksdb.Checkpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -2361,7 +2362,14 @@ public final class StandardProcessSession implements
ProcessSession, ProvenanceE
throw new FlowFileAccessException("Failed to access ContentClaim
for " + source.toString(), e);
}
- final InputStream rawIn = getInputStream(source,
record.getCurrentClaim(), record.getCurrentClaimOffset(), true);
+ final InputStream rawIn;
+ try {
+ rawIn = getInputStream(source, record.getCurrentClaim(),
record.getCurrentClaimOffset(), true);
+ } catch (final ContentNotFoundException nfe) {
+ handleContentNotFound(nfe, record);
+ throw nfe;
+ }
+
final InputStream limitedIn = new LimitedInputStream(rawIn,
source.getSize());
final ByteCountingInputStream countingStream = new
ByteCountingInputStream(limitedIn);
final FlowFileAccessInputStream ffais = new
FlowFileAccessInputStream(countingStream, source, record.getCurrentClaim());
@@ -2375,13 +2383,13 @@ public final class StandardProcessSession implements
ProcessSession, ProvenanceE
try {
return ffais.read();
} catch (final ContentNotFoundException cnfe) {
- handleContentNotFound(cnfe, record);
close();
+ handleContentNotFound(cnfe, record);
throw cnfe;
} catch (final FlowFileAccessException ffae) {
LOG.error("Failed to read content from " + sourceFlowFile
+ "; rolling back session", ffae);
- rollback(true);
close();
+ rollback(true);
throw ffae;
}
}
@@ -2396,13 +2404,13 @@ public final class StandardProcessSession implements
ProcessSession, ProvenanceE
try {
return ffais.read(b, off, len);
} catch (final ContentNotFoundException cnfe) {
- handleContentNotFound(cnfe, record);
close();
+ handleContentNotFound(cnfe, record);
throw cnfe;
} catch (final FlowFileAccessException ffae) {
LOG.error("Failed to read content from " + sourceFlowFile
+ "; rolling back session", ffae);
- rollback(true);
close();
+ rollback(true);
throw ffae;
}
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
index bf1a579..6ac10d5 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
@@ -21,6 +21,7 @@ import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
import
org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
@@ -29,6 +30,7 @@ import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.NiFiProperties;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@@ -37,10 +39,12 @@ import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
@@ -165,6 +169,50 @@ public class TestFileSystemRepository {
}
@Test
+ public void testContentNotFoundExceptionThrownIfResourceClaimTooShort()
throws IOException {
+ final File contentFile = new File("target/content_repository/0/0.bin");
+ try (final OutputStream fos = new FileOutputStream(contentFile)) {
+ fos.write("Hello World".getBytes(StandardCharsets.UTF_8));
+ }
+
+ final ResourceClaim resourceClaim = new
StandardResourceClaim(claimManager, "default", "0", "0.bin", false);
+ final StandardContentClaim existingContentClaim = new
StandardContentClaim(resourceClaim, 0);
+ existingContentClaim.setLength(11);
+
+ try (final InputStream in = repository.read(existingContentClaim)) {
+ final byte[] buff = new byte[11];
+ StreamUtils.fillBuffer(in, buff);
+ assertEquals("Hello World", new String(buff,
StandardCharsets.UTF_8));
+ }
+
+ final StandardContentClaim halfContentClaim = new
StandardContentClaim(resourceClaim, 6);
+ halfContentClaim.setLength(5);
+
+ try (final InputStream in = repository.read(halfContentClaim)) {
+ final byte[] buff = new byte[5];
+ StreamUtils.fillBuffer(in, buff);
+ assertEquals("World", new String(buff, StandardCharsets.UTF_8));
+ }
+
+ final StandardContentClaim emptyContentClaim = new
StandardContentClaim(resourceClaim, 11);
+ existingContentClaim.setLength(0);
+
+ try (final InputStream in = repository.read(emptyContentClaim)) {
+ assertEquals(-1, in.read());
+ }
+
+ final StandardContentClaim missingContentClaim = new
StandardContentClaim(resourceClaim, 12);
+ missingContentClaim.setLength(1);
+
+ try {
+ repository.read(missingContentClaim);
+ Assert.fail("Did not throw ContentNotFoundException");
+ } catch (final ContentNotFoundException cnfe) {
+ // Expected
+ }
+ }
+
+ @Test
public void testBogusFile() throws IOException {
repository.shutdown();
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH,
TestFileSystemRepository.class.getResource("/conf/nifi.properties").getFile());