Github user mattyb149 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2945#discussion_r217493130
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
---
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+import org.apache.nifi.processors.standard.sql.SqlWriter;
+import org.apache.nifi.processors.standard.util.JdbcCommon;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.IntStream;
+
+
+public abstract class AbstractQueryDatabaseTable extends
AbstractDatabaseFetchProcessor {
+
+ public static final String RESULT_TABLENAME = "tablename";
+ public static final String RESULT_ROW_COUNT = "querydbtable.row.count";
+
+ public static final String FRAGMENT_ID =
FragmentAttributes.FRAGMENT_ID.key();
+ public static final String FRAGMENT_INDEX =
FragmentAttributes.FRAGMENT_INDEX.key();
+
+ public static final PropertyDescriptor FETCH_SIZE = new
PropertyDescriptor.Builder()
+ .name("Fetch Size")
+ .description("The number of result rows to be fetched from the
result set at a time. This is a hint to the database driver and may not be "
+ + "honored and/or exact. If the value specified is
zero, then the hint is ignored.")
+ .defaultValue("0")
+ .required(true)
+
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new
PropertyDescriptor.Builder()
+ .name("qdbt-max-rows")
+ .displayName("Max Rows Per Flow File")
+ .description("The maximum number of result rows that will be
included in a single FlowFile. This will allow you to break up very large "
+ + "result sets into multiple FlowFiles. If the value
specified is zero, then all rows are returned in a single FlowFile.")
+ .defaultValue("0")
+ .required(true)
+
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new
PropertyDescriptor.Builder()
+ .name("qdbt-output-batch-size")
+ .displayName("Output Batch Size")
+ .description("The number of output FlowFiles to queue before
committing the process session. When set to zero, the session will be committed
when all result set rows "
+ + "have been processed and the output FlowFiles are
ready for transfer to the downstream relationship. For large result sets, this
can cause a large burst of FlowFiles "
+ + "to be transferred at the end of processor
execution. If this property is set, then when the specified number of FlowFiles
are ready for transfer, then the session will "
+ + "be committed, thus releasing the FlowFiles to the
downstream relationship. NOTE: The maxvalue.* and fragment.count attributes
will not be set on FlowFiles when this "
+ + "property is set.")
+ .defaultValue("0")
+ .required(true)
+
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ public static final PropertyDescriptor MAX_FRAGMENTS = new
PropertyDescriptor.Builder()
+ .name("qdbt-max-frags")
+ .displayName("Maximum Number of Fragments")
+ .description("The maximum number of fragments. If the value
specified is zero, then all fragments are returned. " +
+ "This prevents OutOfMemoryError when this processor
ingests huge table. NOTE: Setting this property can result in data loss, as the
incoming results are "
+ + "not ordered, and fragments may end at arbitrary
boundaries where rows are not included in the result set.")
+ .defaultValue("0")
+ .required(true)
+
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return propDescriptors;
+ }
+
+ @Override
+ protected PropertyDescriptor
getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .required(false)
+
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING,
true))
+
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .dynamic(true)
+ .build();
+ }
+
+ @OnScheduled
+ public void setup(final ProcessContext context) {
+ maxValueProperties = getDefaultMaxValueProperties(context, null);
+ }
+
+ @OnStopped
+ public void stop() {
+ // Reset the column type map in case properties change
+ setupComplete.set(false);
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSessionFactory sessionFactory) throws ProcessException {
+ // Fetch the column/table info once
+ if (!setupComplete.get()) {
+ super.setup(context);
+ }
+ ProcessSession session = sessionFactory.createSession();
+ final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
+
+ final ComponentLog logger = getLogger();
+
+ final DBCPService dbcpService =
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
--- End diff --
I'll take a look, we do a once-only setup above but still in onTrigger(),
that was to avoid infinite timeouts in OnScheduled. If we can share the
DBCPService object between threads, then I can move it there.
---