http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java new file mode 100644 index 0000000..31d0ec8 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processors.standard.sql.RecordSqlWriter; +import org.apache.nifi.processors.standard.sql.SqlWriter; +import org.apache.nifi.processors.standard.util.JdbcCommon; +import org.apache.nifi.serialization.RecordSetWriterFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES; + +@EventDriven +@InputRequirement(Requirement.INPUT_ALLOWED) +@Tags({"sql", "select", "jdbc", "query", "database", "record"}) +@CapabilityDescription("Executes provided SQL select query. Query result will be converted to the format specified by a Record Writer. " + + "Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on " + + "a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. " + + "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the " + + "select query, and the query may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes " + + "with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be " + + "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format. " + + "FlowFile attribute 'executesql.row.count' indicates how many rows were selected.") +@ReadsAttributes({ + @ReadsAttribute(attribute = "sql.args.N.type", description = "Incoming FlowFiles are expected to be parametrized SQL statements. The type of each Parameter is specified as an integer " + + "that represents the JDBC Type of the parameter."), + @ReadsAttribute(attribute = "sql.args.N.value", description = "Incoming FlowFiles are expected to be parametrized SQL statements. The value of the Parameters are specified as " + + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute."), + @ReadsAttribute(attribute = "sql.args.N.format", description = "This attribute is always optional, but default options may not always work for your data. " + + "Incoming FlowFiles are expected to be parametrized SQL statements. In some cases " + + "a format option needs to be specified, currently this is only applicable for binary data types, dates, times and timestamps. Binary Data Types (defaults to 'ascii') - " + + "ascii: each string character in your attribute value represents a single byte. This is the format provided by Avro Processors. " + + "base64: the string is a Base64 encoded string that can be decoded to bytes. " + + "hex: the string is hex encoded with all letters in upper case and no '0x' at the beginning. " + + "Dates/Times/Timestamps - " + + "Date, Time and Timestamp formats all support both custom formats or named format ('yyyy-MM-dd','ISO_OFFSET_DATE_TIME') " + + "as specified according to java.time.format.DateTimeFormatter. " + + "If not specified, a long value input is expected to be an unix epoch (milli seconds from 1970/1/1), or a string value in " + + "'yyyy-MM-dd' format for Date, 'HH:mm:ss.SSS' for Time (some database engines e.g. Derby or MySQL do not support milliseconds and will truncate milliseconds), " + + "'yyyy-MM-dd HH:mm:ss.SSS' for Timestamp is used.") +}) +@WritesAttributes({ + @WritesAttribute(attribute = "executesql.row.count", description = "Contains the number of rows returned in the select query"), + @WritesAttribute(attribute = "executesql.query.duration", description = "Combined duration of the query execution time and fetch time in milliseconds"), + @WritesAttribute(attribute = "executesql.query.executiontime", description = "Duration of the query execution time in milliseconds"), + @WritesAttribute(attribute = "executesql.query.fetchtime", description = "Duration of the result set fetch time in milliseconds"), + @WritesAttribute(attribute = "executesql.resultset.index", description = "Assuming multiple result sets are returned, " + + "the zero based index of this result set."), + @WritesAttribute(attribute = "fragment.identifier", description = "If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set " + + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), + @WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of " + + "FlowFiles produced by a single ResultSet. This can be used in conjunction with the " + + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, then this " + + "attribute will not be populated."), + @WritesAttribute(attribute = "fragment.index", description = "If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of " + + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be " + + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order " + + "FlowFiles were produced"), + @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer."), + @WritesAttribute(attribute = "record.count", description = "The number of records output by the Record Writer.") +}) +public class ExecuteSQLRecord extends AbstractExecuteSQL { + + + public static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder() + .name("esqlrecord-record-writer") + .displayName("Record Writer") + .description("Specifies the Controller Service to use for writing results to a FlowFile. The Record Writer may use Inherit Schema to emulate the inferred schema behavior, i.e. " + + "an explicit schema need not be defined in the writer, and will be supplied by the same logic used to infer the schema from the column types.") + .identifiesControllerService(RecordSetWriterFactory.class) + .required(true) + .build(); + + public static final PropertyDescriptor NORMALIZE_NAMES = new PropertyDescriptor.Builder() + .name("esqlrecord-normalize") + .displayName("Normalize Table/Column Names") + .description("Whether to change characters in column names. For example, colons and periods will be changed to underscores.") + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + + public ExecuteSQLRecord() { + final Set<Relationship> r = new HashSet<>(); + r.add(REL_SUCCESS); + r.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(r); + + final List<PropertyDescriptor> pds = new ArrayList<>(); + pds.add(DBCP_SERVICE); + pds.add(SQL_SELECT_QUERY); + pds.add(QUERY_TIMEOUT); + pds.add(RECORD_WRITER_FACTORY); + pds.add(NORMALIZE_NAMES); + pds.add(USE_AVRO_LOGICAL_TYPES); + pds.add(MAX_ROWS_PER_FLOW_FILE); + pds.add(OUTPUT_BATCH_SIZE); + propDescriptors = Collections.unmodifiableList(pds); + } + + @Override + protected SqlWriter configureSqlWriter(ProcessSession session, ProcessContext context, FlowFile fileToProcess) { + final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); + final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES).asBoolean(); + final Boolean useAvroLogicalTypes = context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean(); + final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder() + .convertNames(convertNamesForAvro) + .useLogicalTypes(useAvroLogicalTypes) + .build(); + final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class); + + return new RecordSqlWriter(recordSetWriterFactory, options, maxRowsPerFlowFile, fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes()); + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index 1923e2c..71348ef 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.processors.standard; -import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -27,49 +26,21 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.state.Scope; -import org.apache.nifi.components.state.StateManager; -import org.apache.nifi.components.state.StateMap; -import org.apache.nifi.dbcp.DBCPService; -import org.apache.nifi.expression.AttributeExpression; import org.apache.nifi.expression.ExpressionLanguageScope; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.flowfile.attributes.FragmentAttributes; -import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.standard.db.DatabaseAdapter; +import org.apache.nifi.processors.standard.sql.DefaultAvroSqlWriter; +import org.apache.nifi.processors.standard.sql.SqlWriter; import org.apache.nifi.processors.standard.util.JdbcCommon; -import org.apache.nifi.util.StopWatch; -import java.io.IOException; -import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.sql.Statement; -import java.text.ParseException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.IntStream; import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_PRECISION; import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_SCALE; @@ -112,60 +83,7 @@ import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC @DynamicProperty(name = "initial.maxvalue.<max_value_column>", value = "Initial maximum value for the specified column", expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY, description = "Specifies an initial max value for max value column(s). Properties should " + "be added in the format `initial.maxvalue.<max_value_column>`. This value is only used the first time the table is accessed (when a Maximum Value Column is specified).") -public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { - - public static final String RESULT_TABLENAME = "tablename"; - public static final String RESULT_ROW_COUNT = "querydbtable.row.count"; - - public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key(); - public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key(); - - public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder() - .name("Fetch Size") - .description("The number of result rows to be fetched from the result set at a time. This is a hint to the database driver and may not be " - + "honored and/or exact. If the value specified is zero, then the hint is ignored.") - .defaultValue("0") - .required(true) - .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .build(); - - public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder() - .name("qdbt-max-rows") - .displayName("Max Rows Per Flow File") - .description("The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large " - + "result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.") - .defaultValue("0") - .required(true) - .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .build(); - - public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new PropertyDescriptor.Builder() - .name("qdbt-output-batch-size") - .displayName("Output Batch Size") - .description("The number of output FlowFiles to queue before committing the process session. When set to zero, the session will be committed when all result set rows " - + "have been processed and the output FlowFiles are ready for transfer to the downstream relationship. For large result sets, this can cause a large burst of FlowFiles " - + "to be transferred at the end of processor execution. If this property is set, then when the specified number of FlowFiles are ready for transfer, then the session will " - + "be committed, thus releasing the FlowFiles to the downstream relationship. NOTE: The maxvalue.* and fragment.count attributes will not be set on FlowFiles when this " - + "property is set.") - .defaultValue("0") - .required(true) - .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .build(); - - public static final PropertyDescriptor MAX_FRAGMENTS = new PropertyDescriptor.Builder() - .name("qdbt-max-frags") - .displayName("Maximum Number of Fragments") - .description("The maximum number of fragments. If the value specified is zero, then all fragments are returned. " + - "This prevents OutOfMemoryError when this processor ingests huge table. NOTE: Setting this property can result in data loss, as the incoming results are " - + "not ordered, and fragments may end at arbitrary boundaries where rows are not included in the result set.") - .defaultValue("0") - .required(true) - .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .build(); +public class QueryDatabaseTable extends AbstractQueryDatabaseTable { public QueryDatabaseTable() { final Set<Relationship> r = new HashSet<>(); @@ -197,365 +115,22 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { } @Override - public Set<Relationship> getRelationships() { - return relationships; - } - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return propDescriptors; - } - - @Override - protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { - return new PropertyDescriptor.Builder() - .name(propertyDescriptorName) - .required(false) - .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) - .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .dynamic(true) - .build(); - } - - @OnScheduled - public void setup(final ProcessContext context) { - maxValueProperties = getDefaultMaxValueProperties(context, null); - } - - @OnStopped - public void stop() { - // Reset the column type map in case properties change - setupComplete.set(false); - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { - // Fetch the column/table info once - if (!setupComplete.get()) { - super.setup(context); - } - ProcessSession session = sessionFactory.createSession(); - final List<FlowFile> resultSetFlowFiles = new ArrayList<>(); - - final ComponentLog logger = getLogger(); - - final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); - final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue()); + protected SqlWriter configureSqlWriter(ProcessSession session, ProcessContext context) { final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue(); - final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions().getValue(); - final String sqlQuery = context.getProperty(SQL_QUERY).evaluateAttributeExpressions().getValue(); - final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue(); - final String customWhereClause = context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions().getValue(); - final Integer fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger(); + final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean(); + final Boolean useAvroLogicalTypes = context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean(); final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); - final Integer outputBatchSizeField = context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger(); - final int outputBatchSize = outputBatchSizeField == null ? 0 : outputBatchSizeField; - final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet() - ? context.getProperty(MAX_FRAGMENTS).evaluateAttributeExpressions().asInteger() - : 0; + final Integer defaultPrecision = context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions().asInteger(); + final Integer defaultScale = context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions().asInteger(); + final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder() .recordName(tableName) + .convertNames(convertNamesForAvro) + .useLogicalTypes(useAvroLogicalTypes) + .defaultPrecision(defaultPrecision) + .defaultScale(defaultScale) .maxRows(maxRowsPerFlowFile) - .convertNames(context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean()) - .useLogicalTypes(context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean()) - .defaultPrecision(context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions().asInteger()) - .defaultScale(context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions().asInteger()) .build(); - - final StateManager stateManager = context.getStateManager(); - final StateMap stateMap; - - try { - stateMap = stateManager.getState(Scope.CLUSTER); - } catch (final IOException ioe) { - getLogger().error("Failed to retrieve observed maximum values from the State Manager. Will not perform " - + "query until this is accomplished.", ioe); - context.yield(); - return; - } - // Make a mutable copy of the current state property map. This will be updated by the result row callback, and eventually - // set as the current state map (after the session has been committed) - final Map<String, String> statePropertyMap = new HashMap<>(stateMap.toMap()); - - //If an initial max value for column(s) has been specified using properties, and this column is not in the state manager, sync them to the state property map - for (final Map.Entry<String, String> maxProp : maxValueProperties.entrySet()) { - String maxPropKey = maxProp.getKey().toLowerCase(); - String fullyQualifiedMaxPropKey = getStateKey(tableName, maxPropKey, dbAdapter); - if (!statePropertyMap.containsKey(fullyQualifiedMaxPropKey)) { - String newMaxPropValue; - // If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme) - // the value has been stored under a key that is only the column name. Fall back to check the column name, - // but store the new initial max value under the fully-qualified key. - if (statePropertyMap.containsKey(maxPropKey)) { - newMaxPropValue = statePropertyMap.get(maxPropKey); - } else { - newMaxPropValue = maxProp.getValue(); - } - statePropertyMap.put(fullyQualifiedMaxPropKey, newMaxPropValue); - - } - } - - List<String> maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames) - ? null - : Arrays.asList(maxValueColumnNames.split("\\s*,\\s*")); - final String selectQuery = getQuery(dbAdapter, tableName, sqlQuery, columnNames, maxValueColumnNameList, customWhereClause, statePropertyMap); - final StopWatch stopWatch = new StopWatch(true); - final String fragmentIdentifier = UUID.randomUUID().toString(); - - try (final Connection con = dbcpService.getConnection(Collections.emptyMap()); - final Statement st = con.createStatement()) { - - if (fetchSize != null && fetchSize > 0) { - try { - st.setFetchSize(fetchSize); - } catch (SQLException se) { - // Not all drivers support this, just log the error (at debug level) and move on - logger.debug("Cannot set fetch size to {} due to {}", new Object[]{fetchSize, se.getLocalizedMessage()}, se); - } - } - - String jdbcURL = "DBCPService"; - try { - DatabaseMetaData databaseMetaData = con.getMetaData(); - if (databaseMetaData != null) { - jdbcURL = databaseMetaData.getURL(); - } - } catch (SQLException se) { - // Ignore and use default JDBC URL. This shouldn't happen unless the driver doesn't implement getMetaData() properly - } - - final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue(); - st.setQueryTimeout(queryTimeout); // timeout in seconds - if (logger.isDebugEnabled()) { - logger.debug("Executing query {}", new Object[] { selectQuery }); - } - try (final ResultSet resultSet = st.executeQuery(selectQuery)) { - int fragmentIndex=0; - // Max values will be updated in the state property map by the callback - final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(tableName, statePropertyMap, dbAdapter); - - while(true) { - final AtomicLong nrOfRows = new AtomicLong(0L); - - FlowFile fileToProcess = session.create(); - try { - fileToProcess = session.write(fileToProcess, out -> { - try { - nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, maxValCollector)); - } catch (SQLException | RuntimeException e) { - throw new ProcessException("Error during database query or conversion of records to Avro.", e); - } - }); - } catch (ProcessException e) { - // Add flowfile to results before rethrowing so it will be removed from session in outer catch - resultSetFlowFiles.add(fileToProcess); - throw e; - } - - if (nrOfRows.get() > 0) { - // set attribute how many rows were selected - fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); - fileToProcess = session.putAttribute(fileToProcess, RESULT_TABLENAME, tableName); - fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY); - if(maxRowsPerFlowFile > 0) { - fileToProcess = session.putAttribute(fileToProcess, FRAGMENT_ID, fragmentIdentifier); - fileToProcess = session.putAttribute(fileToProcess, FRAGMENT_INDEX, String.valueOf(fragmentIndex)); - } - - logger.info("{} contains {} Avro records; transferring to 'success'", - new Object[]{fileToProcess, nrOfRows.get()}); - - session.getProvenanceReporter().receive(fileToProcess, jdbcURL, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); - resultSetFlowFiles.add(fileToProcess); - // If we've reached the batch size, send out the flow files - if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) { - session.transfer(resultSetFlowFiles, REL_SUCCESS); - session.commit(); - resultSetFlowFiles.clear(); - } - } else { - // If there were no rows returned, don't send the flowfile - session.remove(fileToProcess); - // If no rows and this was first FlowFile, yield - if(fragmentIndex == 0){ - context.yield(); - } - break; - } - - fragmentIndex++; - if (maxFragments > 0 && fragmentIndex >= maxFragments) { - break; - } - - // If we aren't splitting up the data into flow files or fragments, then the result set has been entirely fetched so don't loop back around - if (maxFragments == 0 && maxRowsPerFlowFile == 0) { - break; - } - - // If we are splitting up the data into flow files, don't loop back around if we've gotten all results - if(maxRowsPerFlowFile > 0 && nrOfRows.get() < maxRowsPerFlowFile) { - break; - } - } - - // Apply state changes from the Max Value tracker - maxValCollector.applyStateChanges(); - - // Even though the maximum value and total count are known at this point, to maintain consistent behavior if Output Batch Size is set, do not store the attributes - if (outputBatchSize == 0) { - for (int i = 0; i < resultSetFlowFiles.size(); i++) { - // Add maximum values as attributes - for (Map.Entry<String, String> entry : statePropertyMap.entrySet()) { - // Get just the column name from the key - String key = entry.getKey(); - String colName = key.substring(key.lastIndexOf(NAMESPACE_DELIMITER) + NAMESPACE_DELIMITER.length()); - resultSetFlowFiles.set(i, session.putAttribute(resultSetFlowFiles.get(i), "maxvalue." + colName, entry.getValue())); - } - - //set count on all FlowFiles - if (maxRowsPerFlowFile > 0) { - resultSetFlowFiles.set(i, - session.putAttribute(resultSetFlowFiles.get(i), "fragment.count", Integer.toString(fragmentIndex))); - } - } - } - } catch (final SQLException e) { - throw e; - } - - session.transfer(resultSetFlowFiles, REL_SUCCESS); - - } catch (final ProcessException | SQLException e) { - logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e}); - if (!resultSetFlowFiles.isEmpty()) { - session.remove(resultSetFlowFiles); - } - context.yield(); - } finally { - session.commit(); - try { - // Update the state - stateManager.setState(statePropertyMap, Scope.CLUSTER); - } catch (IOException ioe) { - getLogger().error("{} failed to update State Manager, maximum observed values will not be recorded", new Object[]{this, ioe}); - } - } - } - - protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String columnNames, List<String> maxValColumnNames, - String customWhereClause, Map<String, String> stateMap) { - - return getQuery(dbAdapter, tableName, null, columnNames, maxValColumnNames, customWhereClause, stateMap); - } - - protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String sqlQuery, String columnNames, List<String> maxValColumnNames, - String customWhereClause, Map<String, String> stateMap) { - if (StringUtils.isEmpty(tableName)) { - throw new IllegalArgumentException("Table name must be specified"); - } - final StringBuilder query; - - if (StringUtils.isEmpty(sqlQuery)) { - query = new StringBuilder(dbAdapter.getSelectStatement(tableName, columnNames, null, null, null, null)); - } else { - query = getWrappedQuery(dbAdapter, sqlQuery, tableName); - } - - List<String> whereClauses = new ArrayList<>(); - // Check state map for last max values - if (stateMap != null && !stateMap.isEmpty() && maxValColumnNames != null) { - IntStream.range(0, maxValColumnNames.size()).forEach((index) -> { - String colName = maxValColumnNames.get(index); - String maxValueKey = getStateKey(tableName, colName, dbAdapter); - String maxValue = stateMap.get(maxValueKey); - if (StringUtils.isEmpty(maxValue)) { - // If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme) - // the value has been stored under a key that is only the column name. Fall back to check the column name; either way, when a new - // maximum value is observed, it will be stored under the fully-qualified key from then on. - maxValue = stateMap.get(colName.toLowerCase()); - } - if (!StringUtils.isEmpty(maxValue)) { - Integer type = columnTypeMap.get(maxValueKey); - if (type == null) { - // This shouldn't happen as we are populating columnTypeMap when the processor is scheduled. - throw new IllegalArgumentException("No column type found for: " + colName); - } - // Add a condition for the WHERE clause - whereClauses.add(colName + (index == 0 ? " > " : " >= ") + getLiteralByType(type, maxValue, dbAdapter.getName())); - } - }); - } - - if (customWhereClause != null) { - whereClauses.add("(" + customWhereClause + ")"); - } - - if (!whereClauses.isEmpty()) { - query.append(" WHERE "); - query.append(StringUtils.join(whereClauses, " AND ")); - } - - return query.toString(); - } - - protected class MaxValueResultSetRowCollector implements JdbcCommon.ResultSetRowCallback { - DatabaseAdapter dbAdapter; - final Map<String, String> newColMap; - final Map<String, String> originalState; - String tableName; - - public MaxValueResultSetRowCollector(String tableName, Map<String, String> stateMap, DatabaseAdapter dbAdapter) { - this.dbAdapter = dbAdapter; - this.originalState = stateMap; - - this.newColMap = new HashMap<>(); - this.newColMap.putAll(stateMap); - - this.tableName = tableName; - } - - @Override - public void processRow(ResultSet resultSet) throws IOException { - if (resultSet == null) { - return; - } - try { - // Iterate over the row, check-and-set max values - final ResultSetMetaData meta = resultSet.getMetaData(); - final int nrOfColumns = meta.getColumnCount(); - if (nrOfColumns > 0) { - for (int i = 1; i <= nrOfColumns; i++) { - String colName = meta.getColumnName(i).toLowerCase(); - String fullyQualifiedMaxValueKey = getStateKey(tableName, colName, dbAdapter); - Integer type = columnTypeMap.get(fullyQualifiedMaxValueKey); - // Skip any columns we're not keeping track of or whose value is null - if (type == null || resultSet.getObject(i) == null) { - continue; - } - String maxValueString = newColMap.get(fullyQualifiedMaxValueKey); - // If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme) - // the value has been stored under a key that is only the column name. Fall back to check the column name; either way, when a new - // maximum value is observed, it will be stored under the fully-qualified key from then on. - if (StringUtils.isEmpty(maxValueString)) { - maxValueString = newColMap.get(colName); - } - String newMaxValueString = getMaxValueFromRow(resultSet, i, type, maxValueString, dbAdapter.getName()); - if (newMaxValueString != null) { - newColMap.put(fullyQualifiedMaxValueKey, newMaxValueString); - } - } - } - } catch (ParseException | SQLException e) { - throw new IOException(e); - } - } - - @Override - public void applyStateChanges() { - this.originalState.putAll(this.newColMap); - } + return new DefaultAvroSqlWriter(options); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java new file mode 100644 index 0000000..ea89256 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processors.standard.sql.RecordSqlWriter; +import org.apache.nifi.processors.standard.sql.SqlWriter; +import org.apache.nifi.processors.standard.util.JdbcCommon; +import org.apache.nifi.serialization.RecordSetWriterFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES; + + +@TriggerSerially +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"sql", "select", "jdbc", "query", "database", "record"}) +@SeeAlso({GenerateTableFetch.class, ExecuteSQL.class}) +@CapabilityDescription("Generates a SQL select query, or uses a provided statement, and executes it to fetch all rows whose values in the specified " + + "Maximum Value column(s) are larger than the " + + "previously-seen maxima. Query result will be converted to the format specified by the record writer. Expression Language is supported for several properties, but no incoming " + + "connections are permitted. The Variable Registry may be used to provide values for any property containing Expression Language. If it is desired to " + + "leverage flow file attributes to perform these queries, the GenerateTableFetch and/or ExecuteSQL processors can be used for this purpose. " + + "Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on " + + "a timer or cron expression, using the standard scheduling methods. This processor is intended to be run on the Primary Node only. FlowFile attribute " + + "'querydbtable.row.count' indicates how many rows were selected.") +@Stateful(scopes = Scope.CLUSTER, description = "After performing a query on the specified table, the maximum values for " + + "the specified column(s) will be retained for use in future executions of the query. This allows the Processor " + + "to fetch only those records that have max values greater than the retained values. This can be used for " + + "incremental fetching, fetching of newly added rows, etc. To clear the maximum values, clear the state of the processor " + + "per the State Management documentation") +@WritesAttributes({ + @WritesAttribute(attribute = "tablename", description="Name of the table being queried"), + @WritesAttribute(attribute = "querydbtable.row.count", description="The number of rows selected by the query"), + @WritesAttribute(attribute="fragment.identifier", description="If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set " + + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), + @WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of " + + "FlowFiles produced by a single ResultSet. This can be used in conjunction with the " + + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, then this " + + "attribute will not be populated."), + @WritesAttribute(attribute="fragment.index", description="If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of " + + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be " + + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order " + + "FlowFiles were produced"), + @WritesAttribute(attribute = "maxvalue.*", description = "Each attribute contains the observed maximum value of a specified 'Maximum-value Column'. The " + + "suffix of the attribute is the name of the column. If Output Batch Size is set, then this attribute will not be populated."), + @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer."), + @WritesAttribute(attribute = "record.count", description = "The number of records output by the Record Writer.") +}) +@DynamicProperty(name = "initial.maxvalue.<max_value_column>", value = "Initial maximum value for the specified column", + expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY, description = "Specifies an initial max value for max value column(s). Properties should " + + "be added in the format `initial.maxvalue.<max_value_column>`. This value is only used the first time the table is accessed (when a Maximum Value Column is specified).") +public class QueryDatabaseTableRecord extends AbstractQueryDatabaseTable { + + public static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder() + .name("qdbtr-record-writer") + .displayName("Record Writer") + .description("Specifies the Controller Service to use for writing results to a FlowFile. The Record Writer may use Inherit Schema to emulate the inferred schema behavior, i.e. " + + "an explicit schema need not be defined in the writer, and will be supplied by the same logic used to infer the schema from the column types.") + .identifiesControllerService(RecordSetWriterFactory.class) + .required(true) + .build(); + + public static final PropertyDescriptor NORMALIZE_NAMES = new PropertyDescriptor.Builder() + .name("qdbtr-normalize") + .displayName("Normalize Table/Column Names") + .description("Whether to change characters in column names when creating the output schema. For example, colons and periods will be changed to underscores.") + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + + public QueryDatabaseTableRecord() { + final Set<Relationship> r = new HashSet<>(); + r.add(REL_SUCCESS); + relationships = Collections.unmodifiableSet(r); + + final List<PropertyDescriptor> pds = new ArrayList<>(); + pds.add(DBCP_SERVICE); + pds.add(DB_TYPE); + pds.add(new PropertyDescriptor.Builder() + .fromPropertyDescriptor(TABLE_NAME) + .description("The name of the database table to be queried. When a custom query is used, this property is used to alias the query and appears as an attribute on the FlowFile.") + .build()); + pds.add(COLUMN_NAMES); + pds.add(WHERE_CLAUSE); + pds.add(SQL_QUERY); + pds.add(RECORD_WRITER_FACTORY); + pds.add(MAX_VALUE_COLUMN_NAMES); + pds.add(QUERY_TIMEOUT); + pds.add(FETCH_SIZE); + pds.add(MAX_ROWS_PER_FLOW_FILE); + pds.add(OUTPUT_BATCH_SIZE); + pds.add(MAX_FRAGMENTS); + pds.add(NORMALIZE_NAMES); + pds.add(USE_AVRO_LOGICAL_TYPES); + + propDescriptors = Collections.unmodifiableList(pds); + } + + @Override + protected SqlWriter configureSqlWriter(ProcessSession session, ProcessContext context) { + final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); + final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES).asBoolean(); + final Boolean useAvroLogicalTypes = context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean(); + final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder() + .convertNames(convertNamesForAvro) + .useLogicalTypes(useAvroLogicalTypes) + .build(); + final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class); + + return new RecordSqlWriter(recordSetWriterFactory, options, maxRowsPerFlowFile, Collections.emptyMap()); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/DefaultAvroSqlWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/DefaultAvroSqlWriter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/DefaultAvroSqlWriter.java new file mode 100644 index 0000000..574aca7 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/DefaultAvroSqlWriter.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.sql; + +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.standard.AbstractQueryDatabaseTable; +import org.apache.nifi.processors.standard.util.JdbcCommon; + +import java.io.IOException; +import java.io.OutputStream; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +public class DefaultAvroSqlWriter implements SqlWriter { + + private final JdbcCommon.AvroConversionOptions options; + + private final Map<String,String> attributesToAdd = new HashMap<String,String>() {{ + put(CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY); + }}; + + public DefaultAvroSqlWriter(JdbcCommon.AvroConversionOptions options) { + this.options = options; + } + + @Override + public long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback) throws Exception { + try { + return JdbcCommon.convertToAvroStream(resultSet, outputStream, options, callback); + } catch (SQLException e) { + throw new ProcessException(e); + } + } + + @Override + public Map<String, String> getAttributesToAdd() { + return attributesToAdd; + } + + @Override + public void writeEmptyResultSet(OutputStream outputStream, ComponentLog logger) throws IOException { + JdbcCommon.createEmptyAvroStream(outputStream); + } + + @Override + public String getMimeType() { + return JdbcCommon.MIME_TYPE_AVRO_BINARY; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java new file mode 100644 index 0000000..c1a76b4 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.sql; + +import org.apache.avro.Schema; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.standard.AbstractQueryDatabaseTable; +import org.apache.nifi.processors.standard.util.JdbcCommon; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.serialization.record.ResultSetRecordSet; + +import java.io.IOException; +import java.io.OutputStream; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +public class RecordSqlWriter implements SqlWriter { + + private final RecordSetWriterFactory recordSetWriterFactory; + private final AtomicReference<WriteResult> writeResultRef; + private final JdbcCommon.AvroConversionOptions options; + private final int maxRowsPerFlowFile; + private final Map<String, String> originalAttributes; + private ResultSetRecordSet fullRecordSet; + private RecordSchema writeSchema; + private String mimeType; + + public RecordSqlWriter(RecordSetWriterFactory recordSetWriterFactory, JdbcCommon.AvroConversionOptions options, int maxRowsPerFlowFile, Map<String, String> originalAttributes) { + this.recordSetWriterFactory = recordSetWriterFactory; + this.writeResultRef = new AtomicReference<>(); + this.maxRowsPerFlowFile = maxRowsPerFlowFile; + this.options = options; + this.originalAttributes = originalAttributes; + } + + @Override + public long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback) throws Exception { + final RecordSet recordSet; + try { + if (fullRecordSet == null) { + final Schema avroSchema = JdbcCommon.createSchema(resultSet, options); + final RecordSchema recordAvroSchema = AvroTypeUtil.createSchema(avroSchema); + fullRecordSet = new ResultSetRecordSetWithCallback(resultSet, recordAvroSchema, callback); + writeSchema = recordSetWriterFactory.getSchema(originalAttributes, fullRecordSet.getSchema()); + } + recordSet = (maxRowsPerFlowFile > 0) ? fullRecordSet.limit(maxRowsPerFlowFile) : fullRecordSet; + + } catch (final SQLException | SchemaNotFoundException | IOException e) { + throw new ProcessException(e); + } + try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(logger, writeSchema, outputStream)) { + writeResultRef.set(resultSetWriter.write(recordSet)); + if (mimeType == null) { + mimeType = resultSetWriter.getMimeType(); + } + return writeResultRef.get().getRecordCount(); + } catch (final Exception e) { + throw new IOException(e); + } + } + + @Override + public Map<String, String> getAttributesToAdd() { + Map<String, String> attributesToAdd = new HashMap<>(); + attributesToAdd.put(CoreAttributes.MIME_TYPE.key(), mimeType); + + // Add any attributes from the record writer (if present) + final WriteResult result = writeResultRef.get(); + if (result != null) { + if (result.getAttributes() != null) { + attributesToAdd.putAll(result.getAttributes()); + } + + attributesToAdd.put("record.count", String.valueOf(result.getRecordCount())); + } + return attributesToAdd; + } + + @Override + public void updateCounters(ProcessSession session) { + final WriteResult result = writeResultRef.get(); + if (result != null) { + session.adjustCounter("Records Written", result.getRecordCount(), false); + } + } + + @Override + public void writeEmptyResultSet(OutputStream outputStream, ComponentLog logger) throws IOException { + try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(logger, writeSchema, outputStream)) { + mimeType = resultSetWriter.getMimeType(); + resultSetWriter.beginRecordSet(); + resultSetWriter.finishRecordSet(); + } catch (final Exception e) { + throw new IOException(e); + } + } + + @Override + public String getMimeType() { + return mimeType; + } + + private static class ResultSetRecordSetWithCallback extends ResultSetRecordSet { + + private final AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback; + + ResultSetRecordSetWithCallback(ResultSet rs, RecordSchema readerSchema, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback) throws SQLException { + super(rs, readerSchema); + this.callback = callback; + } + + @Override + public Record next() throws IOException { + try { + if (hasMoreRows()) { + ResultSet rs = getResultSet(); + final Record record = createRecord(rs); + if (callback != null) { + callback.processRow(rs); + } + setMoreRows(rs.next()); + return record; + } else { + return null; + } + } catch (final SQLException e) { + throw new IOException("Could not obtain next record from ResultSet", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/SqlWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/SqlWriter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/SqlWriter.java new file mode 100644 index 0000000..08fc3fd --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/SqlWriter.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.sql; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processors.standard.AbstractQueryDatabaseTable; + +import java.io.IOException; +import java.io.OutputStream; +import java.sql.ResultSet; +import java.util.Collections; +import java.util.Map; + +/** + * The SqlWriter interface provides a standard way for processors such as ExecuteSQL, ExecuteSQLRecord, QueryDatabaseTable, and QueryDatabaseTableRecord + * to write SQL result sets out to a flow file in whichever manner is appropriate. For example, ExecuteSQL writes the result set as Avro but ExecuteSQLRecord + * uses the Record API to write the result set out as prescribed by the selected RecordSetWriter. + */ +public interface SqlWriter { + + /** + * Writes the given result set out to the given output stream, possibly applying a callback as each row is processed. + * @param resultSet the ResultSet to be written + * @param outputStream the OutputStream to write the result set to + * @param logger a common logger that can be used to log messages during write + * @param callback a MaxValueResultSetRowCollector that may be called as each row in the ResultSet is processed + * @return the number of rows written to the output stream + * @throws Exception if any errors occur during the writing of the result set to the output stream + */ + long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback) throws Exception; + + /** + * Returns a map of attribute key/value pairs to be added to any outgoing flow file(s). The default implementation is to return an empty map. + * @return a map of attribute key/value pairs + */ + default Map<String, String> getAttributesToAdd() { + return Collections.emptyMap(); + } + + /** + * Updates any session counters as a result of processing result sets. The default implementation is empty, no counters will be updated. + * @param session the session upon which to update counters + */ + default void updateCounters(ProcessSession session) { + } + + /** + * Writes an empty result set to the output stream. In some cases a ResultSet might not have any viable rows, but will throw an error or + * behave unexpectedly if rows are attempted to be retrieved. This method indicates the implementation should write whatever output is + * appropriate for a result set with no rows. + * @param outputStream the OutputStream to write the empty result set to + * @param logger a common logger that can be used to log messages during write + * @throws IOException if any errors occur during the writing of an empty result set to the output stream + */ + void writeEmptyResultSet(OutputStream outputStream, ComponentLog logger) throws IOException; + + /** + * Returns the MIME type of the output format. This can be used in FlowFile attributes or to perform format-specific processing as necessary. + * @return the MIME type string of the output format. + */ + String getMimeType(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index d21b7f4..bfe1403 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -35,6 +35,7 @@ org.apache.nifi.processors.standard.EvaluateXPath org.apache.nifi.processors.standard.EvaluateXQuery org.apache.nifi.processors.standard.ExecuteProcess org.apache.nifi.processors.standard.ExecuteSQL +org.apache.nifi.processors.standard.ExecuteSQLRecord org.apache.nifi.processors.standard.ExecuteStreamCommand org.apache.nifi.processors.standard.ExtractGrok org.apache.nifi.processors.standard.ExtractText @@ -96,6 +97,7 @@ org.apache.nifi.processors.standard.PutSyslog org.apache.nifi.processors.standard.PutTCP org.apache.nifi.processors.standard.PutUDP org.apache.nifi.processors.standard.QueryDatabaseTable +org.apache.nifi.processors.standard.QueryDatabaseTableRecord org.apache.nifi.processors.standard.QueryRecord org.apache.nifi.processors.standard.ReplaceText org.apache.nifi.processors.standard.ReplaceTextWithMapping
