If I haven't worn out my welcome, here is the simplified code that should demonstrate either that I have miscoded your suggestions or that the API doesn't in fact work as advertised. First, the output. The code, both JUnit test and processor are attached and the files are pretty small.

Much thanks,
Russ

This is the input stream first time around (before copying) ===================================
* * * session.read( flowfile );
      Here's what's in input stream:
*<cxml>**
**  <document>**
**    This is the original document.**
**  </document>**
**  <metadata>**
**    <date_of_service>2016-06-28 13:23</date_of_service>**
**  </metadata>**
**  <demographics>**
**<date_of_birth>1980-07-01</date_of_birth>**
**    <age>36</age>**
**  </demographics>**
**</cxml>*

And now, let's copy some of the input stream to the output stream =============================
* * * flowfile = session.write( flowfile, new StreamCallback() ...
      Copying input stream to output stream up to </document>...
      The output stream has in it at this point:
*<cxml>**
**  <document>**
**    This is the original document.**
**  </document>**
*
[1. When we examine the output stream, it has what we expect.]

After copying, can we reopen input stream intact and does outputstream have what we think? ====
* * * flowfile = session.write( flowfile, new StreamCallback() ...
      Here's what's in input stream:
*<cxml>**
**  <document>**
**    This is the original document.**
**  </document>*

[2. The input stream as reported just above is truncated by exactly the content we did       not copy to the output stream. We expected to see the entire, original file, but the
      second half is gone.]

      Here's what's in the output stream at this point:
* (nothing)*

[3. The content we copied to the output stream has disappeared. Does it disappear simply
    because we looked at it (printed it out here)?]


On 3/29/20 5:05 AM, Joe Witt wrote:
Russell

I recommend writing very simple code that does two successive read/write
operations on basic data so you can make sure the api work/as expected.
Then add the xml bits.

Thanks

On Sun, Mar 29, 2020 at 5:15 AM Mike Thomsen <[email protected]> wrote:

If these files are only a few MB at the most, you can also just export them
to a ByteArrayOutputStream. Just a thought.

On Sun, Mar 29, 2020 at 12:16 AM Russell Bateman <[email protected]>
wrote:

Joe and Mike,

Sadly, I was not able to get very far on this. It seems that the extend
to which I copy the first half of the contents of the input stream, I
lose what comes after when I try to read again, basically, the second
half comprising the <metadata>and <demographics>elements which I was
hoping to SAX-parse. Here's code and output. I have highlighted the
output to make it easier to read.

? <#>
|try|
|{|
|||InputStream inputStream = session.read( flowfile );|
|||System.out.println( ||"This is the input stream first time around
(before copying to output stream)..."| |);|
|||System.out.println( StreamUtilities.fromStream( inputStream ) );|
|||inputStream.close();|
|}|
|catch||( IOException e )|
|{|
|||e.printStackTrace();|
|}|
|flowfile = session.write( flowfile, ||new| |StreamCallback()|
|{|
|||@Override|
|||public| |void| |process( InputStream inputStream, OutputStream
outputStream ) ||throws| |IOException|
|||{|
|||System.out.println( ||"And now, let's copy..."| |);|
|||CxmlStreamUtilities.copyCxmlHeaderAndDocumentToOutput( inputStream,
outputStream );|
|||}|
|} );|
|try|
|{|
|||InputStream inputStream = session.read( flowfile );|
|||System.out.println( ||"This is the input stream second time around
(after copying)..."| |);|
|||System.out.println( StreamUtilities.fromStream( inputStream ) );|
|||inputStream.close();|
|}|
|catch||( IOException e )|
|{|
|||e.printStackTrace();|
|}|
|// ...on to SAX parser which dies because the input has been truncated
to|
|// exactly what was written out to the output stream|


Output of above:

This is the input stream first time around (before copying to output
stream)...
<cxml>
    <document>
      This is the original document.
    </document>
    <metadata>
      <date_of_service>2016-06-28 13:23</date_of_service>
    </metadata>
    <demographics>
      <date_of_birth>1980-07-01</date_of_birth>
      <age>36</age>
    </demographics>
</cxml>

And now, let's copy...
This is the input stream second time around (after copying)...
<cxml>
    <document>
      This is the original document.
    </document>
And now, we'll go on to the SAX parser...
<cxml> <document> This is the original document. </document>
[pool-1-thread-1] ERROR [...] SAX ruleparser error:
org.xml.sax.SAXParseException; lineNumber: 4; columnNumber: 14; XML
document structures must start and end within the same entity.


I left off the code that prints, "And now, we'll go on to the SAX
parser..." It's in the next flowfile = session.write( ... ). I have unit
tests that verify the good functioning of
copyCxmlHeaderAndDocumentToOutput(). The SAX error occurs because the
"file" is truncated; SAX finds the first "half" just fine, but there is
no second "half". If I comment out copying from input stream to output
stream, the error doesn't occur--the whole document is there.

Thanks for looking at this again if you can,
Russ

On 3/27/20 3:08 PM, Joe Witt wrote:
you should be able to call write as many times as you need.  just keep
using the resulting flowfile reference into the next call.

On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman <[email protected]
wrote:

Mike,

Many thanks for responding. Do you mean to say that all I have to do
is
something like this?

      public void onTrigger( final ProcessContext context, final
      ProcessSession session ) throws ProcessException
      {
         FlowFile flowfile = session.get();
         ...

         // this is will be our resulting flowfile...
         AtomicReference< OutputStream > savedOutputStream = new
      AtomicReference<>();

         /* Do some processing on the in-coming flowfile then close its
      input stream, but
          * save the output stream for continued use.
          */
      *  session.write( flowfile, new InputStreamCallback()*
         {
           @Override
      *    public void process( InputStream inputStream, OutputStream
      outputStream ) throws IOException*
           {
             savedOutputStream.set( outputStream );
             ...

             // processing puts some output on the output stream...
             outputStream.write( etc. );

             inputStream.close();
           }
      *  } );*

         /* Start over doing different processing on the
(same/reopened)
      in-coming flowfile
          * continuing to use the original output stream. It's our
      responsibility to close
          * the saved output stream, NiFi closes the unused output
stream
      opened, but
          * ignored by us.
          */
      *  session.write( flowfile, new StreamCallback()*
         {
           @Override
      *    public void process( InputStream inputStream, OutputStream
      outputStream ) throws IOException*
           {
             outputStream = savedOutputStream.get(); // (discard the
new
      output stream)
             ...

             // processing puts (some more) output on the original
output
      stream...
             outputStream.write( etc. );

             outputStream.close();
           }
      *  } );*

         session.transfer( flowfile, etc. );
      }

I'm wondering if this will work to "discard" the new output stream
opened for me (the second time) and replace it with the original one
which was probably closed when the first call to
session.write()finished. What's on these streams is way too big for me
to put them into temporary memory, say, a ByteArrayOutputStream.

Russ

On 3/27/20 10:03 AM, Mike Thomsen wrote:
session.read(FlowFile) just gives you an InputStream. You should be
able
to
rerun that as many times as you want provided you properly close it.

On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <
[email protected]>
wrote:

In my custom processor, I'm using a SAX parser to process an
incoming
flowfile that's in XML. Except that, this particular XML is in
essence
two different files and I would like to split, read and process the
first "half", which starts a couple of lines (XML elements) into the
file) not using the SAX parser. At the end, I would stream the
output
of
the first half, then the SAX-processed second half.

So, in short:

    1. process the incoming flowfile for the early content not using
SAX,
       but merely copying as-is; at all cost I must avoid
"reassembling"
       the first half using my SAX handler (what I'm doing now),
    2. output the first part down the output stream to the resulting
flowfile,
    3. (re)process the incoming flowfile using SAX (and I can just
skip
       over the first bit) and spitting the result of this second
part
out
       down the output stream of the resulting flowfile.

I guess this is tantamount to asking how, in Java, I can read an
input
stream twice (or one-half plus one times). Maybe it's less a NiFi
developer question and more a Java question. I have looked at it
that
way too, but, if one of you knows (particularly NiFi) best
practice, I
would very much like to hear about it.

Thanks.




package com.windofkeltia.processor;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.StreamCallback;

@SuppressWarnings( "DuplicatedCode" )
@CapabilityDescription( "Test reading the input stream and writing output stream splitting, then seeing"
      + "if the input stream gets reopened intact and the contents of the output stream are obtainable."
)
public class ReadSplitWrite extends AbstractProcessor
{
  @Override
  public void onTrigger( final ProcessContext context, final ProcessSession session ) throws ProcessException
  {
    FlowFile flowfile = session.get(); if( flowfile == null ) { context.yield(); return; }

    // -----------------------------------------------------------------------------------------------------------------
    System.out.println( "This is the input stream first time around (before copying) ===================================" );
    System.out.println( "* * * session.read( flowfile );" );
    session.read( flowfile, new InputStreamCallback()
    {
      @Override public void process( InputStream inputStream ) throws IOException
      {
        System.out.println( "      Here's what's in input stream:" );
        System.out.println( fromStream( inputStream ) );
      }
    } );

    // -----------------------------------------------------------------------------------------------------------------
    System.out.println( "And now, let's copy some of the input stream to the output stream =============================" );
    System.out.println( "* * * flowfile = session.write( flowfile, new StreamCallback() ..." );
    flowfile = session.write( flowfile, new StreamCallback()
    {
      @Override
      public void process( InputStream inputStream, OutputStream outputStream ) throws IOException
      {
        System.out.println( "      Copying input stream to output stream up to </document>..." );
        copyCxmlHeaderAndDocumentToOutput( inputStream, outputStream );
        System.out.println( "      The output stream has in it at this point:" );
        System.out.println( fromStream( outputStream ) + "\n" );
      }
    } );

    System.out.println( "[1. When we examine the output stream, it has what we expect.]\n" );

    // -----------------------------------------------------------------------------------------------------------------
    System.out.println( "After copying, can we reopen input stream intact and does outputstream have what we think? ====" );
    System.out.println( "* * * flowfile = session.write( flowfile, new StreamCallback() ..." );
    flowfile = session.write( flowfile, new StreamCallback()
    {
      @Override
      public void process( InputStream inputStream, OutputStream outputStream ) throws IOException
      {
        System.out.println( "      Here's what's in input stream:" );
        System.out.println( fromStream( inputStream ) );
        System.out.println( "\n[2. The input stream as reported just above is truncated by exactly the content we did" );
        System.out.println( "      not copy to the output stream. We expected to see the entire, original file, but the" );
        System.out.println( "      second half is gone.]\n" );
        System.out.println( "      Here's what's in the output stream at this point:" );
        System.out.println( " (nothing)" );
        System.out.println( fromStream( inputStream ) );
      }
    } );

    System.out.println( "[3. The content we copied to the output stream has disappeared. Does it disappear simply" );
    System.out.println( "    because we looked at it (printed it out here)?]\n" );
    // -----------------------------------------------------------------------------------------------------------------

    session.transfer( flowfile, SUCCESS );
  }

  private static String fromStream( InputStream inputStream ) throws IOException
  {
    int           ch;
    StringBuilder sb = new StringBuilder();

    while( ( ch = inputStream.read() ) != -1 )
      sb.append( ( char ) ch );

    return sb.toString();
  }

  private static String fromStream( OutputStream outputStream )
  {
    return new String( ( ( ByteArrayOutputStream ) outputStream ).toByteArray() );
  }

  private static final String PATTERN        = "</document>";
  private static final int    PATTERN_LENGTH = PATTERN.length();
  private static final int    FIRST_CHAR     = '<';
  private static final int    EOS            = -1;

  private static void copyCxmlHeaderAndDocumentToOutput( InputStream inputStream, OutputStream outputStream )
      throws IOException
  {
    int     ch       = inputStream.read();
    int     pos      = 1;
    boolean matching = false;
    boolean eos      = ( ch == EOS );

    while( !eos )
    {
      if( !matching && ch == FIRST_CHAR )
      {
        matching = true;
      }
      else if( matching )
      {
        if( pos >= PATTERN_LENGTH )
        {
          eos = true;
          continue;
        }
        else if( ch != PATTERN.charAt( pos++ ) )
        {
          matching = false;
          pos      = 1;
        }
      }

      outputStream.write( ch );
      ch  = inputStream.read();
      eos = ( ch == EOS );
    }
  }

  public static final Relationship SUCCESS = new Relationship.Builder().name( "Success" ).description( "Success" ).build();

  private List< PropertyDescriptor > properties;
  private Set< Relationship >        relationships;

  @Override
  public void init( final ProcessorInitializationContext context )
  {
    //noinspection MismatchedQueryAndUpdateOfCollection
    List< PropertyDescriptor > properties = new ArrayList<>();
    this.properties = Collections.unmodifiableList( properties );

    Set<Relationship> relationships = new HashSet<>();
    relationships.add( SUCCESS );
    this.relationships = Collections.unmodifiableSet( relationships );
  }

  @Override public Set< Relationship >        getRelationships()                { return relationships; }
  @Override public List< PropertyDescriptor > getSupportedPropertyDescriptors() { return properties; }
}
package com.windofkeltia.processor;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;

import com.windofkeltia.processor.ReadSplitWrite;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;

import static org.junit.Assert.assertEquals;

import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;

public class ReadSplitWriteTest
{
  // @formatter:off
  @Rule   public TestName name = new TestName();
  @After  public void tearDown() { }
  @Before public void setUp()
  {
    runner = TestRunners.newTestRunner( new ReadSplitWrite() );
    runner.setValidateExpressionUsage( false );
  }

  private static       TestRunner runner;
  private static final int        ONE = 1;
  private static final String     INPUT = "<cxml>\n"
                                        + "  <document>\n"
                                        + "    This is the original document.\n"
                                        + "  </document>\n"
                                        + "  <metadata>\n"
                                        + "    <date_of_service>2016-06-28 13:23</date_of_service>\n"
                                        + "  </metadata>\n"
                                        + "  <demographics>\n"
                                        + "    <date_of_birth>1980-07-01</date_of_birth>\n"
                                        + "    <age>36</age>\n"
                                        + "  </demographics>\n"
                                        + "</cxml>\n";

  @Test
  public void test() throws IOException
  {
    runner.enqueue( new ByteArrayInputStream( INPUT.getBytes( StandardCharsets.UTF_8 ) ) );
    runner.run( ONE );
    runner.assertQueueEmpty();

    List< MockFlowFile > flowfiles = runner.getFlowFilesForRelationship( ReadSplitWrite.SUCCESS );
    assertEquals( flowfiles.size(), 1 );

    MockFlowFile flowfile = flowfiles.get( 0 );
    String       ACTUAL   = new String( runner.getContentAsByteArray( flowfile ) );

    System.out.println( ACTUAL );
  }
}

Reply via email to