This is an automated email from the ASF dual-hosted git repository.

mthomsen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new d1fd1f5  Support for flowfile attribute in TABLE_NAME
d1fd1f5 is described below

commit d1fd1f50922cf658dcbe1cbbef1e583ab63b95a8
Author: Michael Karpel <[email protected]>
AuthorDate: Sun May 12 11:31:02 2019 +0300

    Support for flowfile attribute in TABLE_NAME
    
    This closes #3472
    
    Signed-off-by: Mike Thomsen <[email protected]>
---
 .../java/org/apache/nifi/processors/kudu/PutKudu.java     | 15 ++++++---------
 1 file changed, 6 insertions(+), 9 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
index 9c0c503..a2889c7 100644
--- 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -74,8 +74,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static 
org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
-
 @EventDriven
 @SupportsBatching
 @RequiresInstanceClassLoading // Because of calls to 
UserGroupInformation.setConfiguration
@@ -91,7 +89,7 @@ public class PutKudu extends AbstractProcessor {
         .description("List all kudu masters's ip with port (e.g. 7051), comma 
separated")
         .required(true)
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-        .expressionLanguageSupported(VARIABLE_REGISTRY)
+        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
         .build();
 
     protected static final PropertyDescriptor TABLE_NAME = new Builder()
@@ -99,7 +97,7 @@ public class PutKudu extends AbstractProcessor {
         .description("The name of the Kudu Table to put data into")
         .required(true)
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-        .expressionLanguageSupported(VARIABLE_REGISTRY)
+        
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
         .build();
 
     static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new 
Builder()
@@ -169,7 +167,7 @@ public class PutKudu extends AbstractProcessor {
         .defaultValue("100")
         .required(true)
         .addValidator(StandardValidators.createLongValidator(1, 100000, true))
-        .expressionLanguageSupported(VARIABLE_REGISTRY)
+        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
         .build();
 
 
@@ -190,7 +188,6 @@ public class PutKudu extends AbstractProcessor {
     protected int ffbatch   = 1;
 
     protected KuduClient kuduClient;
-    protected KuduTable kuduTable;
     private volatile KerberosUser kerberosUser;
 
     @Override
@@ -220,7 +217,6 @@ public class PutKudu extends AbstractProcessor {
 
     @OnScheduled
     public void onScheduled(final ProcessContext context) throws IOException, 
LoginException {
-        final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
         final String kuduMasters = 
context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
         operationType = 
OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue());
         batchSize = 
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
@@ -230,7 +226,6 @@ public class PutKudu extends AbstractProcessor {
         getLogger().debug("Setting up Kudu connection...");
         final KerberosCredentialsService credentialsService = 
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
         kuduClient = createClient(kuduMasters, credentialsService);
-        kuduTable = kuduClient.openTable(tableName);
         getLogger().debug("Kudu connection successfully initialized");
     }
 
@@ -307,9 +302,11 @@ public class PutKudu extends AbstractProcessor {
         final List<RowError> pendingRowErrors = new ArrayList<>();
         for (FlowFile flowFile : flowFiles) {
             try (final InputStream in = session.read(flowFile);
-                 final RecordReader recordReader = 
recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
+                final RecordReader recordReader = 
recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
                 final List<String> fieldNames = 
recordReader.getSchema().getFieldNames();
                 final RecordSet recordSet = recordReader.createRecordSet();
+                final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+                final KuduTable kuduTable = kuduClient.openTable(tableName);
 
                 Record record = recordSet.next();
                 while (record != null) {

Reply via email to