Repository: nifi
Updated Branches:
  refs/heads/master e603c486f -> 455e3c1bc


NIFI-5834: Restore default PutHiveQL error handling behavior

NIFI-5834: Incorporated review comments

This closes #3179.

Signed-off-by: Koji Kawamura <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/455e3c1b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/455e3c1b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/455e3c1b

Branch: refs/heads/master
Commit: 455e3c1bc8d4e12ea75d4e6ac2e4c58cbb535e5d
Parents: e603c48
Author: Matthew Burgess <[email protected]>
Authored: Tue Nov 20 17:58:59 2018 -0500
Committer: Koji Kawamura <[email protected]>
Committed: Tue Nov 27 18:10:26 2018 +0900

----------------------------------------------------------------------
 .../apache/nifi/processors/hive/PutHiveQL.java  |  7 +-
 .../nifi/processors/hive/TestPutHiveQL.java     | 80 +++++++++++++++++++-
 .../apache/nifi/processors/hive/PutHive3QL.java | 19 ++++-
 .../nifi/processors/hive/TestPutHive3QL.java    | 79 ++++++++++++++++++-
 4 files changed, 179 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/455e3c1b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java
index e053a9a..bb5d526 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java
@@ -149,10 +149,11 @@ public class PutHiveQL extends AbstractHiveQLProcessor {
             } else if (e instanceof SQLException) {
                 // Use the SQLException's vendor code for guidance -- see 
Hive's ErrorMsg class for details on error codes
                 int errorCode = ((SQLException) e).getErrorCode();
+                getLogger().debug("Error occurred during Hive operation, Hive 
returned error code {}", new Object[]{errorCode});
                 if (errorCode >= 10000 && errorCode < 20000) {
                     return ErrorTypes.InvalidInput;
                 } else if (errorCode >= 20000 && errorCode < 30000) {
-                    return ErrorTypes.TemporalFailure;
+                    return ErrorTypes.InvalidInput;
                 } else if (errorCode >= 30000 && errorCode < 40000) {
                     return ErrorTypes.TemporalInputFailure;
                 } else if (errorCode >= 40000 && errorCode < 50000) {
@@ -160,7 +161,9 @@ public class PutHiveQL extends AbstractHiveQLProcessor {
                     // a ProcessException, we'll route to failure via an 
InvalidInput error type.
                     return ErrorTypes.InvalidInput;
                 } else {
-                    return ErrorTypes.UnknownFailure;
+                    // Default unknown errors to TemporalFailure (as they were 
implemented originally), so they can be routed to failure
+                    // or rolled back depending on the user's setting of 
Rollback On Failure.
+                    return ErrorTypes.TemporalFailure;
                 }
             } else {
                 return ErrorTypes.UnknownFailure;

http://git-wip-us.apache.org/repos/asf/nifi/blob/455e3c1b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java
index af737ae..dd16ca1 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java
@@ -719,6 +719,78 @@ public class TestPutHiveQL {
         runner.assertAllFlowFilesTransferred(PutHiveQL.REL_RETRY, 0);
     }
 
+    @Test
+    public void testUnknownFailure() throws InitializationException, 
ProcessException {
+        final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class);
+        final SQLExceptionService service = new SQLExceptionService(null);
+        service.setErrorCode(2);
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp");
+
+        final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, 
?); " +
+                "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER));
+        attributes.put("hiveql.args.1.value", "1");
+
+        attributes.put("hiveql.args.2.type", String.valueOf(Types.VARCHAR));
+        attributes.put("hiveql.args.2.value", "Mark");
+
+        attributes.put("hiveql.args.3.type", String.valueOf(Types.INTEGER));
+        attributes.put("hiveql.args.3.value", "84");
+
+        attributes.put("hiveql.args.4.type", String.valueOf(Types.INTEGER));
+        attributes.put("hiveql.args.4.value", "1");
+
+        runner.enqueue(sql.getBytes(), attributes);
+        runner.run();
+
+        // should fail because there isn't a valid connection and tables don't 
exist.
+        runner.assertAllFlowFilesTransferred(PutHiveQL.REL_RETRY, 1);
+    }
+
+    @Test
+    public void testUnknownFailureRollbackOnFailure() throws 
InitializationException, ProcessException {
+        final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class);
+        final SQLExceptionService service = new SQLExceptionService(null);
+        service.setErrorCode(0);
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp");
+        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
+
+        final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, 
?); " +
+                "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER));
+        attributes.put("hiveql.args.1.value", "1");
+
+        attributes.put("hiveql.args.2.type", String.valueOf(Types.VARCHAR));
+        attributes.put("hiveql.args.2.value", "Mark");
+
+        attributes.put("hiveql.args.3.type", String.valueOf(Types.INTEGER));
+        attributes.put("hiveql.args.3.value", "84");
+
+        attributes.put("hiveql.args.4.type", String.valueOf(Types.INTEGER));
+        attributes.put("hiveql.args.4.value", "1");
+
+        runner.enqueue(sql.getBytes(), attributes);
+        try {
+            runner.run();
+            fail("Should throw ProcessException");
+        } catch (AssertionError e) {
+            assertTrue(e.getCause() instanceof ProcessException);
+        }
+
+        assertEquals(1, runner.getQueueSize().getObjectCount());
+        runner.assertAllFlowFilesTransferred(PutHiveQL.REL_RETRY, 0);
+    }
+
     /**
      * Simple implementation only for testing purposes
      */
@@ -758,6 +830,7 @@ public class TestPutHiveQL {
         private final HiveDBCPService service;
         private int allowedBeforeFailure = 0;
         private int successful = 0;
+        private int errorCode = 30000; // Default to a retryable exception code
 
         SQLExceptionService(final HiveDBCPService service) {
             this.service = service;
@@ -773,8 +846,7 @@ public class TestPutHiveQL {
             try {
                 if (++successful > allowedBeforeFailure) {
                     final Connection conn = Mockito.mock(Connection.class);
-                    // Throw a retryable error
-                    
Mockito.when(conn.prepareStatement(Mockito.any(String.class))).thenThrow(new 
SQLException("Unit Test Generated SQLException", "42000", 20000));
+                    
Mockito.when(conn.prepareStatement(Mockito.any(String.class))).thenThrow(new 
SQLException("Unit Test Generated SQLException", "42000", errorCode));
                     return conn;
                 } else {
                     return service.getConnection();
@@ -789,5 +861,9 @@ public class TestPutHiveQL {
         public String getConnectionURL() {
             return service != null ? service.getConnectionURL() : null;
         }
+
+        void setErrorCode(int errorCode) {
+            this.errorCode = errorCode;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/455e3c1b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3QL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3QL.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3QL.java
index 989d085..162d4fe 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3QL.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3QL.java
@@ -148,7 +148,24 @@ public class PutHive3QL extends AbstractHive3QLProcessor {
             if (e instanceof SQLNonTransientException) {
                 return ErrorTypes.InvalidInput;
             } else if (e instanceof SQLException) {
-                return ErrorTypes.TemporalFailure;
+                // Use the SQLException's vendor code for guidance -- see 
Hive's ErrorMsg class for details on error codes
+                int errorCode = ((SQLException) e).getErrorCode();
+                getLogger().debug("Error occurred during Hive operation, Hive 
returned error code {}", new Object[]{errorCode});
+                if (errorCode >= 10000 && errorCode < 20000) {
+                    return ErrorTypes.InvalidInput;
+                } else if (errorCode >= 20000 && errorCode < 30000) {
+                    return ErrorTypes.InvalidInput;
+                } else if (errorCode >= 30000 && errorCode < 40000) {
+                    return ErrorTypes.TemporalInputFailure;
+                } else if (errorCode >= 40000 && errorCode < 50000) {
+                    // These are unknown errors (to include some parse 
errors), but rather than generating an UnknownFailure which causes
+                    // a ProcessException, we'll route to failure via an 
InvalidInput error type.
+                    return ErrorTypes.InvalidInput;
+                } else {
+                    // Default unknown errors to TemporalFailure (as they were 
implemented originally), so they can be routed to failure
+                    // or rolled back depending on the user's setting of 
Rollback On Failure.
+                    return ErrorTypes.TemporalFailure;
+                }
             } else {
                 return ErrorTypes.UnknownFailure;
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/455e3c1b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3QL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3QL.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3QL.java
index 99b0b7d..fd571ab 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3QL.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3QL.java
@@ -719,6 +719,78 @@ public class TestPutHive3QL {
         runner.assertAllFlowFilesTransferred(PutHive3QL.REL_RETRY, 0);
     }
 
+    @Test
+    public void testUnknownFailure() throws InitializationException, 
ProcessException {
+        final TestRunner runner = TestRunners.newTestRunner(PutHive3QL.class);
+        final SQLExceptionService service = new SQLExceptionService(null);
+        service.setErrorCode(2);
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        runner.setProperty(PutHive3QL.HIVE_DBCP_SERVICE, "dbcp");
+
+        final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, 
?); " +
+                "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER));
+        attributes.put("hiveql.args.1.value", "1");
+
+        attributes.put("hiveql.args.2.type", String.valueOf(Types.VARCHAR));
+        attributes.put("hiveql.args.2.value", "Mark");
+
+        attributes.put("hiveql.args.3.type", String.valueOf(Types.INTEGER));
+        attributes.put("hiveql.args.3.value", "84");
+
+        attributes.put("hiveql.args.4.type", String.valueOf(Types.INTEGER));
+        attributes.put("hiveql.args.4.value", "1");
+
+        runner.enqueue(sql.getBytes(), attributes);
+        runner.run();
+
+        // should fail because there isn't a valid connection and tables don't 
exist.
+        runner.assertAllFlowFilesTransferred(PutHive3QL.REL_RETRY, 1);
+    }
+
+    @Test
+    public void testUnknownFailureRollbackOnFailure() throws 
InitializationException, ProcessException {
+        final TestRunner runner = TestRunners.newTestRunner(PutHive3QL.class);
+        final SQLExceptionService service = new SQLExceptionService(null);
+        service.setErrorCode(0);
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        runner.setProperty(PutHive3QL.HIVE_DBCP_SERVICE, "dbcp");
+        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
+
+        final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, 
?); " +
+                "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER));
+        attributes.put("hiveql.args.1.value", "1");
+
+        attributes.put("hiveql.args.2.type", String.valueOf(Types.VARCHAR));
+        attributes.put("hiveql.args.2.value", "Mark");
+
+        attributes.put("hiveql.args.3.type", String.valueOf(Types.INTEGER));
+        attributes.put("hiveql.args.3.value", "84");
+
+        attributes.put("hiveql.args.4.type", String.valueOf(Types.INTEGER));
+        attributes.put("hiveql.args.4.value", "1");
+
+        runner.enqueue(sql.getBytes(), attributes);
+        try {
+            runner.run();
+            fail("Should throw ProcessException");
+        } catch (AssertionError e) {
+            assertTrue(e.getCause() instanceof ProcessException);
+        }
+
+        assertEquals(1, runner.getQueueSize().getObjectCount());
+        runner.assertAllFlowFilesTransferred(PutHive3QL.REL_RETRY, 0);
+    }
+
     /**
      * Simple implementation only for testing purposes
      */
@@ -758,6 +830,7 @@ public class TestPutHive3QL {
         private final Hive3DBCPService service;
         private int allowedBeforeFailure = 0;
         private int successful = 0;
+        private int errorCode = 30000; // Default to a retryable exception code
 
         SQLExceptionService(final Hive3DBCPService service) {
             this.service = service;
@@ -773,7 +846,7 @@ public class TestPutHive3QL {
             try {
                 if (++successful > allowedBeforeFailure) {
                     final Connection conn = Mockito.mock(Connection.class);
-                    
Mockito.when(conn.prepareStatement(Mockito.any(String.class))).thenThrow(new 
SQLException("Unit Test Generated SQLException"));
+                    
Mockito.when(conn.prepareStatement(Mockito.any(String.class))).thenThrow(new 
SQLException("Unit Test Generated SQLException", "42000", errorCode));
                     return conn;
                 } else {
                     return service.getConnection();
@@ -788,5 +861,9 @@ public class TestPutHive3QL {
         public String getConnectionURL() {
             return service != null ? service.getConnectionURL() : null;
         }
+
+        void setErrorCode(int errorCode) {
+            this.errorCode = errorCode;
+        }
     }
 }

Reply via email to