This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 78be613a0f NIFI-10888: When inferring a schema using a Record Reader,
buffer up to 1 MB of FlowFile content for the schema inference so that when we
read the contents to obtain records we can use the buffered data. This helps in
cases of small FlowFiles by not having to seek back to the beginning of the
FlowFile every time.
78be613a0f is described below
commit 78be613a0f85b664695ea2cbfaf26163f9b8e454
Author: Mark Payne <[email protected]>
AuthorDate: Mon Nov 28 13:39:36 2022 -0500
NIFI-10888: When inferring a schema using a Record Reader, buffer up to 1
MB of FlowFile content for the schema inference so that when we read the
contents to obtain records we can use the buffered data. This helps in cases of
small FlowFiles by not having to seek back to the beginning of the FlowFile
every time.
Signed-off-by: Matthew Burgess <[email protected]>
This closes #6725
---
.../repository/io/ContentClaimInputStream.java | 56 ++++++++++++++++++++--
.../inference/InferSchemaAccessStrategy.java | 2 +-
2 files changed, 53 insertions(+), 5 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimInputStream.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimInputStream.java
index 74696167cd..3ace1ac318 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimInputStream.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimInputStream.java
@@ -22,6 +22,7 @@ import
org.apache.nifi.controller.repository.metrics.PerformanceTracker;
import
org.apache.nifi.controller.repository.metrics.PerformanceTrackingInputStream;
import org.apache.nifi.stream.io.StreamUtils;
+import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -40,6 +41,8 @@ public class ContentClaimInputStream extends InputStream {
private long bytesConsumed;
private long currentOffset; // offset into the Content Claim; will differ
from bytesRead if reset() is called after reading at least one byte or if
claimOffset > 0
private long markOffset;
+ private InputStream bufferedIn;
+ private int markReadLimit;
public ContentClaimInputStream(final ContentRepository contentRepository,
final ContentClaim contentClaim, final long claimOffset, final
PerformanceTracker performanceTracker) {
this(contentRepository, contentClaim, claimOffset, null,
performanceTracker);
@@ -54,9 +57,13 @@ public class ContentClaimInputStream extends InputStream {
this.currentOffset = claimOffset;
this.delegate = initialDelegate;
+ if (delegate != null) {
+ this.bufferedIn = new BufferedInputStream(delegate);
+ }
}
private InputStream getDelegate() throws IOException {
+ bufferedIn = null;
if (delegate == null) {
formDelegate();
}
@@ -74,7 +81,14 @@ public class ContentClaimInputStream extends InputStream {
@Override
public int read() throws IOException {
- final int value = getDelegate().read();
+ int value = -1;
+ if (bufferedIn != null) {
+ value = bufferedIn.read();
+ }
+
+ if (value < 0) {
+ value = getDelegate().read();
+ }
if (value != -1) {
bytesConsumed++;
currentOffset++;
@@ -85,7 +99,14 @@ public class ContentClaimInputStream extends InputStream {
@Override
public int read(final byte[] b) throws IOException {
- final int count = getDelegate().read(b);
+ int count = -1;
+ if (bufferedIn != null) {
+ count = bufferedIn.read(b);
+ }
+ if (count < 0) {
+ count = getDelegate().read(b);
+ }
+
if (count != -1) {
bytesConsumed += count;
currentOffset += count;
@@ -96,7 +117,14 @@ public class ContentClaimInputStream extends InputStream {
@Override
public int read(final byte[] b, final int off, final int len) throws
IOException {
- final int count = getDelegate().read(b, off, len);
+ int count = -1;
+ if (bufferedIn != null) {
+ count = bufferedIn.read(b, off, len);
+ }
+ if (count < 0) {
+ count = getDelegate().read(b, off, len);
+ }
+
if (count != -1) {
bytesConsumed += count;
currentOffset += count;
@@ -133,6 +161,10 @@ public class ContentClaimInputStream extends InputStream {
@Override
public void mark(final int readlimit) {
markOffset = currentOffset;
+ markReadLimit = readlimit;
+ if (bufferedIn != null) {
+ bufferedIn.mark(readlimit);
+ }
}
@Override
@@ -141,6 +173,13 @@ public class ContentClaimInputStream extends InputStream {
throw new IOException("Stream has not been marked");
}
+ if (bufferedIn != null && bytesConsumed <= markReadLimit) {
+ bufferedIn.reset();
+ currentOffset = markOffset;
+
+ return;
+ }
+
if (currentOffset != markOffset) {
if (delegate != null) {
delegate.close();
@@ -176,8 +215,17 @@ public class ContentClaimInputStream extends InputStream {
delegate = new
PerformanceTrackingInputStream(contentRepository.read(contentClaim),
performanceTracker);
StreamUtils.skip(delegate, claimOffset);
currentOffset = claimOffset;
+
+ if (markReadLimit > 0) {
+ final int limitLeft = (int) (markReadLimit - currentOffset);
+ if (limitLeft > 0) {
+ final InputStream limitedIn = new
LimitedInputStream(delegate, limitLeft);
+ bufferedIn = new BufferedInputStream(limitedIn);
+ bufferedIn.mark(limitLeft);
+ }
+ }
} finally {
- performanceTracker.endContentRead();;
+ performanceTracker.endContentRead();
}
}
}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/InferSchemaAccessStrategy.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/InferSchemaAccessStrategy.java
index 1338386d16..ab28b73b4a 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/InferSchemaAccessStrategy.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/InferSchemaAccessStrategy.java
@@ -43,7 +43,7 @@ public class InferSchemaAccessStrategy<T> implements
SchemaAccessStrategy {
public RecordSchema getSchema(final Map<String, String> variables, final
InputStream contentStream, final RecordSchema readSchema) throws IOException {
// We expect to be able to mark/reset any length because we expect
that the underlying stream here will be a ContentClaimInputStream, which is
able to
// re-read the content regardless of how much data is read.
- contentStream.mark(10_000_000);
+ contentStream.mark(1_000_000);
try {
final RecordSource<T> recordSource =
recordSourceFactory.create(variables, new
NonCloseableInputStream(contentStream));
final RecordSchema schema =
schemaInference.inferSchema(recordSource);