This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 5da77e8e74 NIFI-11636: Do not buffer Parquet content into memory
unnecessarily
5da77e8e74 is described below
commit 5da77e8e74343f04ebe7da34ba47cbc3cbe0d4fc
Author: Mark Payne <[email protected]>
AuthorDate: Fri Jun 2 13:21:15 2023 -0400
NIFI-11636: Do not buffer Parquet content into memory unnecessarily
NIFI-11636: Change default log level from parquet internal reader to WARN
as it logs excessively at INFO level
Signed-off-by: Matt Burgess <[email protected]>
---
.../nifi-resources/src/main/resources/conf/logback.xml | 2 +-
.../java/org/apache/nifi/parquet/stream/NifiParquetInputFile.java | 5 ++---
.../org/apache/nifi/parquet/stream/NifiSeekableInputStream.java | 8 ++++----
3 files changed, 7 insertions(+), 8 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
index 4a1e57e1f5..ae2733f880 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
@@ -119,7 +119,7 @@
<logger name="org.apache.nifi.processors.standard.LogAttribute"
level="INFO"/>
<logger name="org.apache.nifi.processors.standard.LogMessage"
level="INFO"/>
<logger
name="org.apache.nifi.controller.repository.StandardProcessSession"
level="WARN" />
-
+ <logger name="org.apache.parquet.hadoop.InternalParquetRecordReader"
level="WARN" />
<logger name="org.apache.zookeeper.ClientCnxn" level="ERROR" />
<logger name="org.apache.zookeeper.server.NIOServerCnxn" level="ERROR" />
diff --git
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiParquetInputFile.java
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiParquetInputFile.java
index c4ac722c7b..98e8cf3f75 100644
---
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiParquetInputFile.java
+++
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiParquetInputFile.java
@@ -20,7 +20,6 @@ import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.SeekableInputStream;
-import java.io.IOException;
import java.io.InputStream;
public class NifiParquetInputFile implements InputFile {
@@ -42,12 +41,12 @@ public class NifiParquetInputFile implements InputFile {
}
@Override
- public long getLength() throws IOException {
+ public long getLength() {
return length;
}
@Override
- public SeekableInputStream newStream() throws IOException {
+ public SeekableInputStream newStream() {
return new NifiSeekableInputStream(input);
}
}
diff --git
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiSeekableInputStream.java
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiSeekableInputStream.java
index cd6b820536..89d2cc0c32 100644
---
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiSeekableInputStream.java
+++
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiSeekableInputStream.java
@@ -29,11 +29,11 @@ public class NifiSeekableInputStream extends
DelegatingSeekableInputStream {
public NifiSeekableInputStream(final ByteCountingInputStream input) {
super(input);
this.input = input;
- this.input.mark(Integer.MAX_VALUE);
+ this.input.mark(8192);
}
@Override
- public long getPos() throws IOException {
+ public long getPos() {
return input.getBytesConsumed();
}
@@ -47,7 +47,7 @@ public class NifiSeekableInputStream extends
DelegatingSeekableInputStream {
if (newPos < currentPos) {
// seeking backwards so first reset back to beginning of the
stream then seek
input.reset();
- input.mark(Integer.MAX_VALUE);
+ input.mark(8192);
}
// must call getPos() again in case reset was called above
@@ -65,7 +65,7 @@ public class NifiSeekableInputStream extends
DelegatingSeekableInputStream {
}
@Override
- public synchronized void reset() throws IOException {
+ public synchronized void reset() {
throw new UnsupportedOperationException("Mark/reset is not supported");
}
}