Repository: nifi
Updated Branches:
  refs/heads/master afd2b04af -> e9848f427


NIFI-3881 Fix PutHiveStreaming EL evaluation

Signed-off-by: Matt Burgess <[email protected]>

This closes #1791


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e9848f42
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e9848f42
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e9848f42

Branch: refs/heads/master
Commit: e9848f42767338d794f985c72a8be04fe5d1e698
Parents: afd2b04
Author: Tim Reardon <[email protected]>
Authored: Fri May 12 13:25:24 2017 -0400
Committer: Matt Burgess <[email protected]>
Committed: Fri May 12 14:06:32 2017 -0400

----------------------------------------------------------------------
 .../nifi/processors/hive/PutHiveStreaming.java  |  5 ++--
 .../processors/hive/TestPutHiveStreaming.java   | 31 --------------------
 2 files changed, 2 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e9848f42/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
index e7d85cd..f08310e 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
@@ -179,7 +179,6 @@ public class PutHiveStreaming extends 
AbstractSessionFactoryProcessor {
             .description("A comma-delimited list of column names on which the 
table has been partitioned. The order of values in this list must "
                     + "correspond exactly to the order of partition columns 
specified during the table creation.")
             .required(false)
-            .expressionLanguageSupported(false)
             
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("[^,]+(,[^,]+)*")))
 // comma-separated list with non-empty entries
             .build();
 
@@ -329,7 +328,7 @@ public class PutHiveStreaming extends 
AbstractSessionFactoryProcessor {
         final boolean autoCreatePartitions = 
context.getProperty(AUTOCREATE_PARTITIONS).asBoolean();
         final Integer maxConnections = 
context.getProperty(MAX_OPEN_CONNECTIONS).asInteger();
         final Integer heartbeatInterval = 
context.getProperty(HEARTBEAT_INTERVAL).asInteger();
-        final Integer txnsPerBatch = 
context.getProperty(TXNS_PER_BATCH).asInteger();
+        final Integer txnsPerBatch = 
context.getProperty(TXNS_PER_BATCH).evaluateAttributeExpressions().asInteger();
         final String configFiles = 
context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
         hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles);
 
@@ -559,7 +558,7 @@ public class PutHiveStreaming extends 
AbstractSessionFactoryProcessor {
         }
 
         final ComponentLog log = getLogger();
-        final Integer recordsPerTxn = 
context.getProperty(RECORDS_PER_TXN).asInteger();
+        final Integer recordsPerTxn = 
context.getProperty(RECORDS_PER_TXN).evaluateAttributeExpressions(flowFile).asInteger();
 
         // Store the original class loader, then explicitly set it to this 
class's classloader (for use by the Hive Metastore)
         ClassLoader originalClassloader = 
Thread.currentThread().getContextClassLoader();

http://git-wip-us.apache.org/repos/asf/nifi/blob/e9848f42/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
index f642607..6198619 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
@@ -107,7 +107,6 @@ public class TestPutHiveStreaming {
 
     @Test
     public void testSetup() throws Exception {
-        runner.setValidateExpressionUsage(false);
         runner.assertNotValid();
         runner.setProperty(PutHiveStreaming.METASTORE_URI, 
"thrift://localhost:9083");
         runner.setProperty(PutHiveStreaming.DB_NAME, "default");
@@ -119,7 +118,6 @@ public class TestPutHiveStreaming {
 
     @Test
     public void testUgiGetsCleared() {
-        runner.setValidateExpressionUsage(false);
         runner.setProperty(PutHiveStreaming.METASTORE_URI, 
"thrift://localhost:9083");
         runner.setProperty(PutHiveStreaming.DB_NAME, "default");
         runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
@@ -133,7 +131,6 @@ public class TestPutHiveStreaming {
         
when(hiveConf.get(SecurityUtil.HADOOP_SECURITY_AUTHENTICATION)).thenReturn(SecurityUtil.KERBEROS);
         ugi = mock(UserGroupInformation.class);
         when(hiveConfigurator.authenticate(eq(hiveConf), anyString(), 
anyString(), anyLong(), any())).thenReturn(ugi);
-        runner.setValidateExpressionUsage(false);
         runner.setProperty(PutHiveStreaming.METASTORE_URI, 
"thrift://localhost:9083");
         runner.setProperty(PutHiveStreaming.DB_NAME, "default");
         runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
@@ -149,7 +146,6 @@ public class TestPutHiveStreaming {
 
     @Test
     public void testSetupBadPartitionColumns() throws Exception {
-        runner.setValidateExpressionUsage(false);
         runner.setProperty(PutHiveStreaming.METASTORE_URI, 
"thrift://localhost:9083");
         runner.setProperty(PutHiveStreaming.DB_NAME, "default");
         runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
@@ -161,7 +157,6 @@ public class TestPutHiveStreaming {
 
     @Test(expected = AssertionError.class)
     public void testSetupWithKerberosAuthFailed() throws Exception {
-        runner.setValidateExpressionUsage(false);
         runner.setProperty(PutHiveStreaming.METASTORE_URI, 
"thrift://localhost:9083");
         runner.setProperty(PutHiveStreaming.DB_NAME, "default");
         runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
@@ -188,7 +183,6 @@ public class TestPutHiveStreaming {
         runner.setProperty(PutHiveStreaming.DB_NAME, "default");
         runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
         runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
-        runner.setValidateExpressionUsage(false);
         Map<String, Object> user1 = new HashMap<String, Object>() {
             {
                 put("name", "Joe");
@@ -208,7 +202,6 @@ public class TestPutHiveStreaming {
         runner.setProperty(PutHiveStreaming.DB_NAME, "default");
         runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
         runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
-        runner.setValidateExpressionUsage(false);
         runner.enqueue("I am not an Avro record".getBytes());
         runner.run();
 
@@ -222,7 +215,6 @@ public class TestPutHiveStreaming {
         runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
         runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
         runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
-        runner.setValidateExpressionUsage(false);
         runner.enqueue("I am not an Avro record".getBytes());
         try {
             runner.run();
@@ -243,7 +235,6 @@ public class TestPutHiveStreaming {
         runner.setProperty(PutHiveStreaming.DB_NAME, "default");
         runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
         runner.setProperty(PutHiveStreaming.RECORDS_PER_TXN, "100");
-        runner.setValidateExpressionUsage(false);
         Map<String, Object> user1 = new HashMap<String, Object>() {
             {
                 put("name", "Joe");
@@ -277,7 +268,6 @@ public class TestPutHiveStreaming {
         runner.setProperty(PutHiveStreaming.DB_NAME, "default");
         runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
         runner.setProperty(PutHiveStreaming.RECORDS_PER_TXN, "2");
-        runner.setValidateExpressionUsage(false);
         Map<String, Object> user1 = new HashMap<String, Object>() {
             {
                 put("name", "Joe");
@@ -311,7 +301,6 @@ public class TestPutHiveStreaming {
         runner.setProperty(PutHiveStreaming.DB_NAME, "default");
         runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
         runner.setProperty(PutHiveStreaming.RECORDS_PER_TXN, "2");
-        runner.setValidateExpressionUsage(false);
         processor.setGenerateWriteFailure(true, 1);
         Map<String, Object> user1 = new HashMap<String, Object>() {
             {
@@ -359,7 +348,6 @@ public class TestPutHiveStreaming {
         runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
         runner.setProperty(PutHiveStreaming.RECORDS_PER_TXN, "2");
         runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
-        runner.setValidateExpressionUsage(false);
         processor.setGenerateWriteFailure(true, 1);
         Map<String, Object> user1 = new HashMap<String, Object>() {
             {
@@ -401,7 +389,6 @@ public class TestPutHiveStreaming {
         runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
         runner.setProperty(PutHiveStreaming.RECORDS_PER_TXN, "2");
         runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
-        runner.setValidateExpressionUsage(false);
         // The first two records are committed, then an issue will happen at 
the 3rd record.
         processor.setGenerateWriteFailure(true, 2);
         Map<String, Object> user1 = new HashMap<String, Object>() {
@@ -481,7 +468,6 @@ public class TestPutHiveStreaming {
         runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
         runner.setProperty(PutHiveStreaming.PARTITION_COLUMNS, 
"favorite_number, favorite_color");
         runner.setProperty(PutHiveStreaming.AUTOCREATE_PARTITIONS, "true");
-        runner.setValidateExpressionUsage(false);
         Map<String, Object> user1 = new HashMap<String, Object>() {
             {
                 put("name", "Joe");
@@ -507,7 +493,6 @@ public class TestPutHiveStreaming {
         runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
         runner.setProperty(PutHiveStreaming.PARTITION_COLUMNS, 
"favorite_food");
         runner.setProperty(PutHiveStreaming.AUTOCREATE_PARTITIONS, "false");
-        runner.setValidateExpressionUsage(false);
         Map<String, Object> user1 = new HashMap<String, Object>() {
             {
                 put("name", "Joe");
@@ -533,7 +518,6 @@ public class TestPutHiveStreaming {
         runner.setProperty(PutHiveStreaming.PARTITION_COLUMNS, 
"favorite_food");
         runner.setProperty(PutHiveStreaming.AUTOCREATE_PARTITIONS, "false");
         runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
-        runner.setValidateExpressionUsage(false);
         Map<String, Object> user1 = new HashMap<String, Object>() {
             {
                 put("name", "Joe");
@@ -563,7 +547,6 @@ public class TestPutHiveStreaming {
         runner.setProperty(PutHiveStreaming.DB_NAME, "default");
         runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
         runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "2");
-        runner.setValidateExpressionUsage(false);
         Map<String, Object> user1 = new HashMap<String, Object>() {
             {
                 put("name", "Joe");
@@ -587,7 +570,6 @@ public class TestPutHiveStreaming {
         runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
         runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
         runner.setProperty(PutHiveStreaming.HEARTBEAT_INTERVAL, "1");
-        runner.setValidateExpressionUsage(false);
         Map<String, Object> user1 = new HashMap<String, Object>() {
             {
                 put("name", "Joe");
@@ -613,7 +595,6 @@ public class TestPutHiveStreaming {
         runner.setProperty(PutHiveStreaming.DB_NAME, "default");
         runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
         runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
-        runner.setValidateExpressionUsage(false);
         Map<String, Object> user1 = new HashMap<String, Object>() {
             {
                 put("name", "Joe");
@@ -636,7 +617,6 @@ public class TestPutHiveStreaming {
         runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
         runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
         runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
-        runner.setValidateExpressionUsage(false);
         Map<String, Object> user1 = new HashMap<String, Object>() {
             {
                 put("name", "Joe");
@@ -665,7 +645,6 @@ public class TestPutHiveStreaming {
         runner.setProperty(PutHiveStreaming.DB_NAME, "default");
         runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
         runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
-        runner.setValidateExpressionUsage(false);
         Map<String, Object> user1 = new HashMap<String, Object>() {
             {
                 put("name", "Joe");
@@ -686,7 +665,6 @@ public class TestPutHiveStreaming {
         runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
         runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
         runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
-        runner.setValidateExpressionUsage(false);
         Map<String, Object> user1 = new HashMap<String, Object>() {
             {
                 put("name", "Joe");
@@ -711,7 +689,6 @@ public class TestPutHiveStreaming {
         runner.setProperty(PutHiveStreaming.DB_NAME, "default");
         runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
         runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
-        runner.setValidateExpressionUsage(false);
         Map<String, Object> user1 = new HashMap<String, Object>() {
             {
                 put("name", "Joe");
@@ -739,7 +716,6 @@ public class TestPutHiveStreaming {
         runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
         runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
         runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
-        runner.setValidateExpressionUsage(false);
         Map<String, Object> user1 = new HashMap<String, Object>() {
             {
                 put("name", "Joe");
@@ -772,7 +748,6 @@ public class TestPutHiveStreaming {
         runner.setProperty(PutHiveStreaming.DB_NAME, "default");
         runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
         runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
-        runner.setValidateExpressionUsage(false);
         Map<String, Object> user1 = new HashMap<String, Object>() {
             {
                 put("name", "Joe");
@@ -794,7 +769,6 @@ public class TestPutHiveStreaming {
         runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
         runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
         runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
-        runner.setValidateExpressionUsage(false);
         Map<String, Object> user1 = new HashMap<String, Object>() {
             {
                 put("name", "Joe");
@@ -822,7 +796,6 @@ public class TestPutHiveStreaming {
         runner.setProperty(PutHiveStreaming.DB_NAME, "default");
         runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
         runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
-        runner.setValidateExpressionUsage(false);
         Map<String, Object> user1 = new HashMap<String, Object>() {
             {
                 put("name", "Joe");
@@ -845,7 +818,6 @@ public class TestPutHiveStreaming {
         runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
         runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
         runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
-        runner.setValidateExpressionUsage(false);
         Map<String, Object> user1 = new HashMap<String, Object>() {
             {
                 put("name", "Joe");
@@ -874,7 +846,6 @@ public class TestPutHiveStreaming {
         runner.setProperty(PutHiveStreaming.DB_NAME, "default");
         runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
         runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
-        runner.setValidateExpressionUsage(false);
         Map<String, Object> user1 = new HashMap<String, Object>() {
             {
                 put("name", "Joe");
@@ -897,7 +868,6 @@ public class TestPutHiveStreaming {
         runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
         runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
         runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
-        runner.setValidateExpressionUsage(false);
         Map<String, Object> user1 = new HashMap<String, Object>() {
             {
                 put("name", "Joe");
@@ -926,7 +896,6 @@ public class TestPutHiveStreaming {
         runner.setProperty(PutHiveStreaming.DB_NAME, "default");
         runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
         runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
-        runner.setValidateExpressionUsage(false);
         Map<String, Object> user1 = new HashMap<String, Object>() {
             {
                 put("name", "Joe");

Reply via email to