[
https://issues.apache.org/jira/browse/NIFI-1118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15119295#comment-15119295
]
ASF GitHub Bot commented on NIFI-1118:
--------------------------------------
Github user markobean commented on a diff in the pull request:
https://github.com/apache/nifi/pull/135#discussion_r50989494
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
---
@@ -230,100 +270,112 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
final ProcessorLog logger = getLogger();
final int headerCount =
context.getProperty(HEADER_LINE_COUNT).asInteger();
final int splitCount =
context.getProperty(LINE_SPLIT_COUNT).asInteger();
+ final double maxFragmentSize;
+ if (context.getProperty(FRAGMENT_MAX_SIZE).isSet()) {
+ maxFragmentSize =
context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B);
+ } else {
+ maxFragmentSize = Integer.MAX_VALUE;
+ }
+ final String headerMarker =
context.getProperty(HEADER_MARKER).getValue();
final boolean removeTrailingNewlines =
context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean();
-
final ObjectHolder<String> errorMessage = new ObjectHolder<>(null);
- final ArrayList<SplitInfo> splitInfos = new ArrayList<>();
-
final long startNanos = System.nanoTime();
final List<FlowFile> splits = new ArrayList<>();
+
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream rawIn) throws
IOException {
try (final BufferedInputStream bufferedIn = new
BufferedInputStream(rawIn);
final ByteCountingInputStream in = new
ByteCountingInputStream(bufferedIn)) {
- // if we have header lines, copy them into a
ByteArrayOutputStream
+ // Identify header, if any
final ByteArrayOutputStream headerStream = new
ByteArrayOutputStream();
- final int headerLinesCopied = readLines(in,
headerCount, headerStream, true);
- if (headerLinesCopied < headerCount) {
- errorMessage.set("Header Line Count is set to " +
headerCount + " but file had only " + headerLinesCopied + " lines");
+ final SplitInfo headerInfo = readHeader(headerCount,
headerMarker, in, headerStream, true);
+ if (headerInfo.lengthLines < headerCount) {
+ errorMessage.set("Header Line Count is set to " +
headerCount + " but file had only "
+ + headerInfo.lengthLines + " lines");
return;
}
while (true) {
- if (headerCount > 0) {
- // if we have header lines, create a new
FlowFile, copy the header lines to that file,
- // and then start copying lines
- final IntegerHolder linesCopied = new
IntegerHolder(0);
- FlowFile splitFile = session.create(flowFile);
- try {
- splitFile = session.write(splitFile, new
OutputStreamCallback() {
- @Override
- public void process(final OutputStream
rawOut) throws IOException {
- try (final BufferedOutputStream
out = new BufferedOutputStream(rawOut)) {
+ FlowFile splitFile = session.create(flowFile);
+ final SplitInfo flowFileInfo = new SplitInfo();
+
+ // if we have header lines, write them out
+ // and then start copying lines
+ try {
+ splitFile = session.write(splitFile, new
OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream
rawOut) throws IOException {
+ try (final BufferedOutputStream out =
new BufferedOutputStream(rawOut)) {
+ long lineCount = 0;
+ long byteCount = 0;
+ // Process header
+ if (headerInfo.lengthLines > 0) {
+ flowFileInfo.lengthBytes =
headerInfo.lengthBytes;
+ byteCount =
headerInfo.lengthBytes;
headerStream.writeTo(out);
- linesCopied.set(readLines(in,
splitCount, out, !removeTrailingNewlines));
+ }
+
+ // Process body
+ while (true) {
+ final ByteArrayOutputStream
dataStream = new ByteArrayOutputStream();
+ in.mark(1);
--- End diff --
This "in.mark()" is used to rollback the entire line if its inclusion in
the current split exceeds maxFragmentSize. (Reset on line 336.) However, its
readlimit should not be 1; it should be the maximum possible length of a line.
Since the length of a given line is unknown, recommend using
"Integer.MAX_VALUE."
> Enable SplitText processor to limit line length and filter header lines
> -----------------------------------------------------------------------
>
> Key: NIFI-1118
> URL: https://issues.apache.org/jira/browse/NIFI-1118
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Extensions
> Reporter: Mark Bean
> Assignee: Joe Skora
> Fix For: 0.6.0
>
>
> Include the following functionality to the SplitText processor:
> 1) Maximum size limit of the split file(s)
> A new split file will be created if the next line to be added to the current
> split file exceeds a user-defined maximum file size
> 2) Header line marker
> User-defined character(s) can be used to identify the header line(s) of the
> data file rather than a predetermined number of lines
> These changes are additions, not a replacement of any property or behavior.
> In the case of header line marker, the existing property "Header Line Count"
> must be zero for the new property and behavior to be used.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)