[
https://issues.apache.org/jira/browse/NIFI-626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14558431#comment-14558431
]
Toivo Adams commented on NIFI-626:
----------------------------------
How to test writing large data to FlowFile using OutputStreamCallback and
TestRunner?
When using TestRunner OutputStreamCallback will provide
ByteArrayOutputStream for out
So FlowFile content must fit to memory, but I need to write much bigger amount
of data (tens of gigabytes) this should not be hold in memory.
Currently fails
Caused by: java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2271)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
at
org.apache.avro.file.DataFileWriter$BufferedFileOutputStream$PositionFilter.write(DataFileWriter.java:446)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
at
org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerWrite(BufferedBinaryEncoder.java:216)
at
org.apache.avro.io.BufferedBinaryEncoder.writeFixed(BufferedBinaryEncoder.java:150)
at
org.apache.avro.file.DataFileStream$DataBlock.writeBlockTo(DataFileStream.java:366)
at
org.apache.avro.file.DataFileWriter.writeBlock(DataFileWriter.java:383)
at
org.apache.avro.file.DataFileWriter.writeIfBlockFull(DataFileWriter.java:328)
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:299)
at
org.apache.nifi.processors.standard.util.JdbcCommon.convertToAvroStream(JdbcCommon.java:64)
at
org.apache.nifi.processors.standard.ExecuteSQL$1.process(ExecuteSQL.java:133)
at
org.apache.nifi.util.MockProcessSession.write(MockProcessSession.java:593)
at
org.apache.nifi.util.MockProcessSession.write(MockProcessSession.java:1)
at
org.apache.nifi.processors.standard.ExecuteSQL.onTrigger(ExecuteSQL.java:128)
Processor code fragment
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
try {
ResultSet resultSet = st.executeQuery(selectQuery);
long nrOfRows =
JdbcCommon.convertToAvroStream(resultSet, out);
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
and test code
@Test
public void test1() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(ExecuteSQL.class);
final DBCPService dbcp = new DBCPServiceSimpleImpl();
final Map<String, String> dbcpProperties = new HashMap<>();
runner.addControllerService("dbcp", dbcp, dbcpProperties);
runner.enableControllerService(dbcp);
runner.setProperty(ExecuteSQL.DBCP_SERVICE, "dbcp");
String query = "select "
+ " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as
PersonCode"
+ ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as
ProductCode"
+ ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as
RelCode"
+ ", ROW_NUMBER() OVER () as rownr "
+ " from persons PER, products PRD, relationships REL";
runner.setValidateExpressionUsage(false);
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, query);
runner.enqueue("Hello".getBytes());
runner.run();
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
runner.clearTransferState();
}
Thanks
Toivo
> ExecuteSQL processor for executing arbitrary SQL queries
> --------------------------------------------------------
>
> Key: NIFI-626
> URL: https://issues.apache.org/jira/browse/NIFI-626
> Project: Apache NiFi
> Issue Type: Sub-task
> Reporter: Toivo Adams
> Assignee: Toivo Adams
> Attachments: NIFI-626_25may2015.patch
>
>
> For example query can be:
> SELECT * FROM orders WHERE orderId = '${order.id}'
> where ${order.id} is FlowFile attribute.
> Result will serialized using Avro.
> Avro gives as possibility to have query Resultset metadata (column name and
> types) included in FlowFile.
> Also Avro should allow streaming.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)