Repository: nifi Updated Branches: refs/heads/master e59cf8665 -> 0f462a7c4
NIFI-3029: QueryDatabaseTable supports max fragments property Signed-off-by: Matt Burgess <[email protected]> This closes #1213 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0f462a7c Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0f462a7c Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0f462a7c Branch: refs/heads/master Commit: 0f462a7c4907cb7bd03b7e2005692389bba15986 Parents: e59cf86 Author: Byunghwa Yun <[email protected]> Authored: Sun Nov 13 13:45:43 2016 +0900 Committer: Matt Burgess <[email protected]> Committed: Wed Dec 14 10:51:27 2016 -0500 ---------------------------------------------------------------------- .../processors/standard/QueryDatabaseTable.java | 17 ++++++++ .../standard/QueryDatabaseTableTest.java | 46 ++++++++++++++++++++ 2 files changed, 63 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/0f462a7c/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index da3d496..00302c9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -120,6 +120,16 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) .build(); + public static final PropertyDescriptor MAX_FRAGMENTS = new PropertyDescriptor.Builder() + .name("qdbt-max-frags") + .displayName("Maximum Number of Fragments") + .description("The maximum number of fragments. If the value specified is zero, then all fragments are returned. " + + "This prevents OutOfMemoryError when this processor ingests huge table.") + .defaultValue("0") + .required(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + public QueryDatabaseTable() { final Set<Relationship> r = new HashSet<>(); r.add(REL_SUCCESS); @@ -134,6 +144,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { pds.add(QUERY_TIMEOUT); pds.add(FETCH_SIZE); pds.add(MAX_ROWS_PER_FLOW_FILE); + pds.add(MAX_FRAGMENTS); pds.add(NORMALIZE_NAMES_FOR_AVRO); propDescriptors = Collections.unmodifiableList(pds); } @@ -179,6 +190,9 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue(); final Integer fetchSize = context.getProperty(FETCH_SIZE).asInteger(); final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).asInteger(); + final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet() + ? context.getProperty(MAX_FRAGMENTS).asInteger() + : 0; final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean(); final Map<String,String> maxValueProperties = getDefaultMaxValueProperties(context.getProperties()); @@ -283,6 +297,9 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { } fragmentIndex++; + if (maxFragments > 0 && fragmentIndex >= maxFragments) { + break; + } } for (int i = 0; i < resultSetFlowFiles.size(); i++) { http://git-wip-us.apache.org/repos/asf/nifi/blob/0f462a7c/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java index 8a6d0b1..3353a87 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java @@ -599,6 +599,52 @@ public class QueryDatabaseTableTest { } @Test + public void testMaxRowsPerFlowFileWithMaxFragments() throws ClassNotFoundException, SQLException, InitializationException, IOException { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + InputStream in; + MockFlowFile mff; + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)"); + int rowCount=0; + //create larger row set + for (int batch = 0; batch < 100; batch++) { + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')"); + rowCount++; + } + + runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE"); + runner.setIncomingConnection(false); + runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID"); + runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE, "9"); + Integer maxFragments = 3; + runner.setProperty(QueryDatabaseTable.MAX_FRAGMENTS, maxFragments.toString()); + + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, maxFragments); + + for (int i = 0; i < maxFragments; i++) { + mff = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(i); + in = new ByteArrayInputStream(mff.toByteArray()); + assertEquals(9, getNumberOfRecordsFromStream(in)); + + mff.assertAttributeExists("fragment.identifier"); + assertEquals(Integer.toString(i), mff.getAttribute("fragment.index")); + assertEquals(maxFragments.toString(), mff.getAttribute("fragment.count")); + } + + runner.clearTransferState(); + } + + @Test public void testInitialMaxValue() throws ClassNotFoundException, SQLException, InitializationException, IOException { // load test data to database
