This is an automated email from the ASF dual-hosted git repository.
mthomsen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new d1fd1f5 Support for flowfile attribute in TABLE_NAME
d1fd1f5 is described below
commit d1fd1f50922cf658dcbe1cbbef1e583ab63b95a8
Author: Michael Karpel <[email protected]>
AuthorDate: Sun May 12 11:31:02 2019 +0300
Support for flowfile attribute in TABLE_NAME
This closes #3472
Signed-off-by: Mike Thomsen <[email protected]>
---
.../java/org/apache/nifi/processors/kudu/PutKudu.java | 15 ++++++---------
1 file changed, 6 insertions(+), 9 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
index 9c0c503..a2889c7 100644
---
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
+++
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -74,8 +74,6 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-import static
org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
-
@EventDriven
@SupportsBatching
@RequiresInstanceClassLoading // Because of calls to
UserGroupInformation.setConfiguration
@@ -91,7 +89,7 @@ public class PutKudu extends AbstractProcessor {
.description("List all kudu masters's ip with port (e.g. 7051), comma
separated")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(VARIABLE_REGISTRY)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
protected static final PropertyDescriptor TABLE_NAME = new Builder()
@@ -99,7 +97,7 @@ public class PutKudu extends AbstractProcessor {
.description("The name of the Kudu Table to put data into")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(VARIABLE_REGISTRY)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new
Builder()
@@ -169,7 +167,7 @@ public class PutKudu extends AbstractProcessor {
.defaultValue("100")
.required(true)
.addValidator(StandardValidators.createLongValidator(1, 100000, true))
- .expressionLanguageSupported(VARIABLE_REGISTRY)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
@@ -190,7 +188,6 @@ public class PutKudu extends AbstractProcessor {
protected int ffbatch = 1;
protected KuduClient kuduClient;
- protected KuduTable kuduTable;
private volatile KerberosUser kerberosUser;
@Override
@@ -220,7 +217,6 @@ public class PutKudu extends AbstractProcessor {
@OnScheduled
public void onScheduled(final ProcessContext context) throws IOException,
LoginException {
- final String tableName =
context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
final String kuduMasters =
context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
operationType =
OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue());
batchSize =
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
@@ -230,7 +226,6 @@ public class PutKudu extends AbstractProcessor {
getLogger().debug("Setting up Kudu connection...");
final KerberosCredentialsService credentialsService =
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
kuduClient = createClient(kuduMasters, credentialsService);
- kuduTable = kuduClient.openTable(tableName);
getLogger().debug("Kudu connection successfully initialized");
}
@@ -307,9 +302,11 @@ public class PutKudu extends AbstractProcessor {
final List<RowError> pendingRowErrors = new ArrayList<>();
for (FlowFile flowFile : flowFiles) {
try (final InputStream in = session.read(flowFile);
- final RecordReader recordReader =
recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
+ final RecordReader recordReader =
recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
final List<String> fieldNames =
recordReader.getSchema().getFieldNames();
final RecordSet recordSet = recordReader.createRecordSet();
+ final String tableName =
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final KuduTable kuduTable = kuduClient.openTable(tableName);
Record record = recordSet.next();
while (record != null) {