[ 
https://issues.apache.org/jira/browse/NIFI-626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14558431#comment-14558431
 ] 

Toivo Adams commented on NIFI-626:
----------------------------------

How to test writing large data to FlowFile using OutputStreamCallback and 
TestRunner?

When using TestRunner  OutputStreamCallback will provide 
   ByteArrayOutputStream for out

So FlowFile content must fit to memory, but I need to write much bigger amount 
of data (tens of gigabytes) this should not be hold in memory.

Currently fails
Caused by: java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOf(Arrays.java:2271)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
        at 
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
        at 
org.apache.avro.file.DataFileWriter$BufferedFileOutputStream$PositionFilter.write(DataFileWriter.java:446)
        at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
        at 
org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerWrite(BufferedBinaryEncoder.java:216)
        at 
org.apache.avro.io.BufferedBinaryEncoder.writeFixed(BufferedBinaryEncoder.java:150)
        at 
org.apache.avro.file.DataFileStream$DataBlock.writeBlockTo(DataFileStream.java:366)
        at 
org.apache.avro.file.DataFileWriter.writeBlock(DataFileWriter.java:383)
        at 
org.apache.avro.file.DataFileWriter.writeIfBlockFull(DataFileWriter.java:328)
        at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:299)
        at 
org.apache.nifi.processors.standard.util.JdbcCommon.convertToAvroStream(JdbcCommon.java:64)
        at 
org.apache.nifi.processors.standard.ExecuteSQL$1.process(ExecuteSQL.java:133)
        at 
org.apache.nifi.util.MockProcessSession.write(MockProcessSession.java:593)
        at 
org.apache.nifi.util.MockProcessSession.write(MockProcessSession.java:1)
        at 
org.apache.nifi.processors.standard.ExecuteSQL.onTrigger(ExecuteSQL.java:128)

Processor code fragment

flowFile = session.write(flowFile, new OutputStreamCallback() {
        @Override
        public void process(final OutputStream out) throws IOException {
                try {
                        ResultSet resultSet = st.executeQuery(selectQuery);
                       long nrOfRows = 
JdbcCommon.convertToAvroStream(resultSet, out);
                
                } catch (SQLException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                }
        }
});

and test code

    @Test
    public void test1() throws InitializationException {
        final TestRunner runner = TestRunners.newTestRunner(ExecuteSQL.class);
        
        final DBCPService dbcp = new DBCPServiceSimpleImpl();
        final Map<String, String> dbcpProperties = new HashMap<>();

        runner.addControllerService("dbcp", dbcp, dbcpProperties);
        
        runner.enableControllerService(dbcp);
        runner.setProperty(ExecuteSQL.DBCP_SERVICE, "dbcp");
        
        String query = "select "
                + "  PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as 
PersonCode"
                + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as 
ProductCode"
                + ", REL.ID as RelId,    REL.NAME as RelName,    REL.CODE as 
RelCode"
                + ", ROW_NUMBER() OVER () as rownr "
                + " from persons PER, products PRD, relationships REL";
        
        runner.setValidateExpressionUsage(false);
        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, query);
        
        runner.enqueue("Hello".getBytes());

        runner.run();
        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
        runner.clearTransferState();
    }

Thanks
Toivo


> ExecuteSQL processor for executing arbitrary SQL queries
> --------------------------------------------------------
>
>                 Key: NIFI-626
>                 URL: https://issues.apache.org/jira/browse/NIFI-626
>             Project: Apache NiFi
>          Issue Type: Sub-task
>            Reporter: Toivo Adams
>            Assignee: Toivo Adams
>         Attachments: NIFI-626_25may2015.patch
>
>
> For example query can be:
> SELECT * FROM orders WHERE orderId = '${order.id}'
> where ${order.id} is FlowFile attribute.
> Result will serialized using Avro.
> Avro gives as possibility to have query Resultset metadata (column name and 
> types) included in FlowFile.
> Also Avro should allow streaming.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to