This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 78be613a0f NIFI-10888: When inferring a schema using a Record Reader, 
buffer up to 1 MB of FlowFile content for the schema inference so that when we 
read the contents to obtain records we can use the buffered data. This helps in 
cases of small FlowFiles by not having to seek back to the beginning of the 
FlowFile every time.
78be613a0f is described below

commit 78be613a0f85b664695ea2cbfaf26163f9b8e454
Author: Mark Payne <[email protected]>
AuthorDate: Mon Nov 28 13:39:36 2022 -0500

    NIFI-10888: When inferring a schema using a Record Reader, buffer up to 1 
MB of FlowFile content for the schema inference so that when we read the 
contents to obtain records we can use the buffered data. This helps in cases of 
small FlowFiles by not having to seek back to the beginning of the FlowFile 
every time.
    
    Signed-off-by: Matthew Burgess <[email protected]>
    
    This closes #6725
---
 .../repository/io/ContentClaimInputStream.java     | 56 ++++++++++++++++++++--
 .../inference/InferSchemaAccessStrategy.java       |  2 +-
 2 files changed, 53 insertions(+), 5 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimInputStream.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimInputStream.java
index 74696167cd..3ace1ac318 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimInputStream.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimInputStream.java
@@ -22,6 +22,7 @@ import 
org.apache.nifi.controller.repository.metrics.PerformanceTracker;
 import 
org.apache.nifi.controller.repository.metrics.PerformanceTrackingInputStream;
 import org.apache.nifi.stream.io.StreamUtils;
 
+import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 
@@ -40,6 +41,8 @@ public class ContentClaimInputStream extends InputStream {
     private long bytesConsumed;
     private long currentOffset; // offset into the Content Claim; will differ 
from bytesRead if reset() is called after reading at least one byte or if 
claimOffset > 0
     private long markOffset;
+    private InputStream bufferedIn;
+    private int markReadLimit;
 
     public ContentClaimInputStream(final ContentRepository contentRepository, 
final ContentClaim contentClaim, final long claimOffset, final 
PerformanceTracker performanceTracker) {
         this(contentRepository, contentClaim, claimOffset, null, 
performanceTracker);
@@ -54,9 +57,13 @@ public class ContentClaimInputStream extends InputStream {
 
         this.currentOffset = claimOffset;
         this.delegate = initialDelegate;
+        if (delegate != null) {
+            this.bufferedIn = new BufferedInputStream(delegate);
+        }
     }
 
     private InputStream getDelegate() throws IOException {
+        bufferedIn = null;
         if (delegate == null) {
             formDelegate();
         }
@@ -74,7 +81,14 @@ public class ContentClaimInputStream extends InputStream {
 
     @Override
     public int read() throws IOException {
-        final int value = getDelegate().read();
+        int value = -1;
+        if (bufferedIn != null) {
+            value = bufferedIn.read();
+        }
+
+        if (value < 0) {
+            value = getDelegate().read();
+        }
         if (value != -1) {
             bytesConsumed++;
             currentOffset++;
@@ -85,7 +99,14 @@ public class ContentClaimInputStream extends InputStream {
 
     @Override
     public int read(final byte[] b) throws IOException {
-        final int count = getDelegate().read(b);
+        int count = -1;
+        if (bufferedIn != null) {
+            count = bufferedIn.read(b);
+        }
+        if (count < 0) {
+            count = getDelegate().read(b);
+        }
+
         if (count != -1) {
             bytesConsumed += count;
             currentOffset += count;
@@ -96,7 +117,14 @@ public class ContentClaimInputStream extends InputStream {
 
     @Override
     public int read(final byte[] b, final int off, final int len) throws 
IOException {
-        final int count = getDelegate().read(b, off, len);
+        int count = -1;
+        if (bufferedIn != null) {
+            count = bufferedIn.read(b, off, len);
+        }
+        if (count < 0) {
+            count = getDelegate().read(b, off, len);
+        }
+
         if (count != -1) {
             bytesConsumed += count;
             currentOffset += count;
@@ -133,6 +161,10 @@ public class ContentClaimInputStream extends InputStream {
     @Override
     public void mark(final int readlimit) {
         markOffset = currentOffset;
+        markReadLimit = readlimit;
+        if (bufferedIn != null) {
+            bufferedIn.mark(readlimit);
+        }
     }
 
     @Override
@@ -141,6 +173,13 @@ public class ContentClaimInputStream extends InputStream {
             throw new IOException("Stream has not been marked");
         }
 
+        if (bufferedIn != null && bytesConsumed <= markReadLimit) {
+            bufferedIn.reset();
+            currentOffset = markOffset;
+
+            return;
+        }
+
         if (currentOffset != markOffset) {
             if (delegate != null) {
                 delegate.close();
@@ -176,8 +215,17 @@ public class ContentClaimInputStream extends InputStream {
             delegate = new 
PerformanceTrackingInputStream(contentRepository.read(contentClaim), 
performanceTracker);
             StreamUtils.skip(delegate, claimOffset);
             currentOffset = claimOffset;
+
+            if (markReadLimit > 0) {
+                final int limitLeft = (int) (markReadLimit - currentOffset);
+                if (limitLeft > 0) {
+                    final InputStream limitedIn = new 
LimitedInputStream(delegate, limitLeft);
+                    bufferedIn = new BufferedInputStream(limitedIn);
+                    bufferedIn.mark(limitLeft);
+                }
+            }
         } finally {
-            performanceTracker.endContentRead();;
+            performanceTracker.endContentRead();
         }
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/InferSchemaAccessStrategy.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/InferSchemaAccessStrategy.java
index 1338386d16..ab28b73b4a 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/InferSchemaAccessStrategy.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/InferSchemaAccessStrategy.java
@@ -43,7 +43,7 @@ public class InferSchemaAccessStrategy<T> implements 
SchemaAccessStrategy {
     public RecordSchema getSchema(final Map<String, String> variables, final 
InputStream contentStream, final RecordSchema readSchema) throws IOException {
         // We expect to be able to mark/reset any length because we expect 
that the underlying stream here will be a ContentClaimInputStream, which is 
able to
         // re-read the content regardless of how much data is read.
-        contentStream.mark(10_000_000);
+        contentStream.mark(1_000_000);
         try {
             final RecordSource<T> recordSource = 
recordSourceFactory.create(variables, new 
NonCloseableInputStream(contentStream));
             final RecordSchema schema = 
schemaInference.inferSchema(recordSource);

Reply via email to