Repository: nifi Updated Branches: refs/heads/master fdd8cdbb3 -> d8d220ccb
NIFI-5601: Add fragment.* attributes to GenerateTableFetch Signed-off-by: Peter Wicks <patric...@gmail.com> This closes #3074 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d8d220cc Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d8d220cc Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d8d220cc Branch: refs/heads/master Commit: d8d220ccb86d1797f56f34649d70a1acff278eb5 Parents: fdd8cdb Author: Matthew Burgess <mattyb...@apache.org> Authored: Mon Oct 15 16:07:13 2018 -0400 Committer: patricker <patric...@gmail.com> Committed: Thu Nov 1 15:14:29 2018 -0600 ---------------------------------------------------------------------- .../AbstractDatabaseFetchProcessor.java | 4 +++ .../standard/AbstractQueryDatabaseTable.java | 6 +--- .../processors/standard/GenerateTableFetch.java | 33 +++++++++++++++----- .../standard/TestGenerateTableFetch.java | 17 +++++++++- 4 files changed, 47 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/d8d220cc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java index 3da8a73..a99ca6a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java @@ -23,6 +23,7 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.FragmentAttributes; import org.apache.nifi.processor.AbstractSessionFactoryProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Relationship; @@ -89,6 +90,9 @@ import static java.sql.Types.VARCHAR; public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFactoryProcessor { public static final String INITIAL_MAX_VALUE_PROP_START = "initial.maxvalue."; + public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key(); + public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key(); + public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key(); // Relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() http://git-wip-us.apache.org/repos/asf/nifi/blob/d8d220cc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java index 06df6c1..57933b3 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java @@ -27,7 +27,6 @@ import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.expression.AttributeExpression; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.FragmentAttributes; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -66,9 +65,6 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr public static final String RESULT_TABLENAME = "tablename"; public static final String RESULT_ROW_COUNT = "querydbtable.row.count"; - public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key(); - public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key(); - public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder() .name("Fetch Size") .description("The number of result rows to be fetched from the result set at a time. This is a hint to the database driver and may not be " @@ -338,7 +334,7 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr //set count on all FlowFiles if (maxRowsPerFlowFile > 0) { resultSetFlowFiles.set(i, - session.putAttribute(resultSetFlowFiles.get(i), "fragment.count", Integer.toString(fragmentIndex))); + session.putAttribute(resultSetFlowFiles.get(i), FRAGMENT_COUNT, Integer.toString(fragmentIndex))); } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/d8d220cc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java index 49779e9..a547393 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java @@ -65,6 +65,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; @@ -97,7 +98,16 @@ import java.util.stream.IntStream; @WritesAttribute(attribute = "generatetablefetch.maxColumnNames", description = "The comma-separated list of column names used to keep track of data " + "that has been returned since the processor started running."), @WritesAttribute(attribute = "generatetablefetch.limit", description = "The number of result rows to be fetched by the SQL statement."), - @WritesAttribute(attribute = "generatetablefetch.offset", description = "Offset to be used to retrieve the corresponding partition.") + @WritesAttribute(attribute = "generatetablefetch.offset", description = "Offset to be used to retrieve the corresponding partition."), + @WritesAttribute(attribute="fragment.identifier", description="All FlowFiles generated from the same query result set " + + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), + @WritesAttribute(attribute = "fragment.count", description = "This is the total number of " + + "FlowFiles produced by a single ResultSet. This can be used in conjunction with the " + + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet."), + @WritesAttribute(attribute="fragment.index", description="This is the position of this FlowFile in the list of " + + "outgoing FlowFiles that were all generated from the same execution. This can be " + + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same execution and in what order " + + "FlowFiles were produced"), }) @DynamicProperty(name = "initial.maxvalue.<max_value_column>", value = "Initial maximum value for the specified column", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, description = "Specifies an initial " @@ -426,6 +436,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { // Generate SQL statements to read "pages" of data Long limit = partitionSize == 0 ? null : (long) partitionSize; + final String fragmentIdentifier = UUID.randomUUID().toString(); for (long i = 0; i < numberOfFetches; i++) { // Add a right bounding for the partitioning column if necessary (only on last partition, meaning we don't need the limit) if ((i == numberOfFetches - 1) && useColumnValsForPaging && (maxValueClauses.isEmpty() || customWhereClause != null)) { @@ -442,20 +453,28 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, maxColumnNames, limit, offset, columnForPartitioning); FlowFile sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess); sqlFlowFile = session.write(sqlFlowFile, out -> out.write(query.getBytes())); - sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.tableName", tableName); + Map<String,String> attributesToAdd = new HashMap<>(); + + attributesToAdd.put("generatetablefetch.tableName", tableName); if (columnNames != null) { - sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.columnNames", columnNames); + attributesToAdd.put("generatetablefetch.columnNames", columnNames); } if (StringUtils.isNotBlank(whereClause)) { - sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.whereClause", whereClause); + attributesToAdd.put("generatetablefetch.whereClause", whereClause); } if (StringUtils.isNotBlank(maxColumnNames)) { - sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.maxColumnNames", maxColumnNames); + attributesToAdd.put("generatetablefetch.maxColumnNames", maxColumnNames); } - sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.limit", String.valueOf(limit)); + attributesToAdd.put("generatetablefetch.limit", String.valueOf(limit)); if (partitionSize != 0) { - sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.offset", String.valueOf(offset)); + attributesToAdd.put("generatetablefetch.offset", String.valueOf(offset)); } + // Add fragment attributes + attributesToAdd.put(FRAGMENT_ID, fragmentIdentifier); + attributesToAdd.put(FRAGMENT_INDEX, String.valueOf(i)); + attributesToAdd.put(FRAGMENT_COUNT, String.valueOf(numberOfFetches)); + + sqlFlowFile = session.putAllAttributes(sqlFlowFile, attributesToAdd); session.transfer(sqlFlowFile, REL_SUCCESS); } http://git-wip-us.apache.org/repos/asf/nifi/blob/d8d220cc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java index 44dcadf..6e0c397 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java @@ -44,9 +44,13 @@ import java.sql.SQLNonTransientConnectionException; import java.sql.Statement; import java.sql.Types; import java.util.HashMap; +import java.util.List; import java.util.Map; import static org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor.DB_TYPE; +import static org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor.FRAGMENT_COUNT; +import static org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor.FRAGMENT_ID; +import static org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor.FRAGMENT_INDEX; import static org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor.REL_SUCCESS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -114,7 +118,7 @@ public class TestGenerateTableFetch { } @Test - public void testAddedRows() throws ClassNotFoundException, SQLException, InitializationException, IOException { + public void testAddedRows() throws SQLException, IOException { // load test data to database final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); @@ -140,6 +144,8 @@ public class TestGenerateTableFetch { MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0); String query = new String(flowFile.toByteArray()); assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID <= 2 ORDER BY ID FETCH NEXT 10000 ROWS ONLY", query); + flowFile.assertAttributeEquals(FRAGMENT_INDEX, "0"); + flowFile.assertAttributeEquals(FRAGMENT_COUNT, "1"); ResultSet resultSet = stmt.executeQuery(query); // Should be three records assertTrue(resultSet.next()); @@ -160,6 +166,15 @@ public class TestGenerateTableFetch { runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "2"); runner.run(); runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2); + // Check fragment attributes + List<MockFlowFile> resultFFs = runner.getFlowFilesForRelationship(REL_SUCCESS); + MockFlowFile ff1 = resultFFs.get(0); + MockFlowFile ff2 = resultFFs.get(1); + assertEquals(ff1.getAttribute(FRAGMENT_ID), ff2.getAttribute(FRAGMENT_ID)); + assertEquals(ff1.getAttribute(FRAGMENT_INDEX), "0"); + assertEquals(ff1.getAttribute(FRAGMENT_COUNT), "2"); + assertEquals(ff2.getAttribute(FRAGMENT_INDEX), "1"); + assertEquals(ff2.getAttribute(FRAGMENT_COUNT), "2"); // Verify first flow file's contents flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);