Repository: nifi Updated Branches: refs/heads/master e603c486f -> 455e3c1bc
NIFI-5834: Restore default PutHiveQL error handling behavior NIFI-5834: Incorporated review comments This closes #3179. Signed-off-by: Koji Kawamura <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/455e3c1b Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/455e3c1b Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/455e3c1b Branch: refs/heads/master Commit: 455e3c1bc8d4e12ea75d4e6ac2e4c58cbb535e5d Parents: e603c48 Author: Matthew Burgess <[email protected]> Authored: Tue Nov 20 17:58:59 2018 -0500 Committer: Koji Kawamura <[email protected]> Committed: Tue Nov 27 18:10:26 2018 +0900 ---------------------------------------------------------------------- .../apache/nifi/processors/hive/PutHiveQL.java | 7 +- .../nifi/processors/hive/TestPutHiveQL.java | 80 +++++++++++++++++++- .../apache/nifi/processors/hive/PutHive3QL.java | 19 ++++- .../nifi/processors/hive/TestPutHive3QL.java | 79 ++++++++++++++++++- 4 files changed, 179 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/455e3c1b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java index e053a9a..bb5d526 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java @@ -149,10 +149,11 @@ public class PutHiveQL extends AbstractHiveQLProcessor { } else if (e instanceof SQLException) { // Use the SQLException's vendor code for guidance -- see Hive's ErrorMsg class for details on error codes int errorCode = ((SQLException) e).getErrorCode(); + getLogger().debug("Error occurred during Hive operation, Hive returned error code {}", new Object[]{errorCode}); if (errorCode >= 10000 && errorCode < 20000) { return ErrorTypes.InvalidInput; } else if (errorCode >= 20000 && errorCode < 30000) { - return ErrorTypes.TemporalFailure; + return ErrorTypes.InvalidInput; } else if (errorCode >= 30000 && errorCode < 40000) { return ErrorTypes.TemporalInputFailure; } else if (errorCode >= 40000 && errorCode < 50000) { @@ -160,7 +161,9 @@ public class PutHiveQL extends AbstractHiveQLProcessor { // a ProcessException, we'll route to failure via an InvalidInput error type. return ErrorTypes.InvalidInput; } else { - return ErrorTypes.UnknownFailure; + // Default unknown errors to TemporalFailure (as they were implemented originally), so they can be routed to failure + // or rolled back depending on the user's setting of Rollback On Failure. + return ErrorTypes.TemporalFailure; } } else { return ErrorTypes.UnknownFailure; http://git-wip-us.apache.org/repos/asf/nifi/blob/455e3c1b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java index af737ae..dd16ca1 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java @@ -719,6 +719,78 @@ public class TestPutHiveQL { runner.assertAllFlowFilesTransferred(PutHiveQL.REL_RETRY, 0); } + @Test + public void testUnknownFailure() throws InitializationException, ProcessException { + final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class); + final SQLExceptionService service = new SQLExceptionService(null); + service.setErrorCode(2); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp"); + + final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + + "UPDATE PERSONS SET NAME='George' WHERE ID=?; "; + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("hiveql.args.1.value", "1"); + + attributes.put("hiveql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("hiveql.args.2.value", "Mark"); + + attributes.put("hiveql.args.3.type", String.valueOf(Types.INTEGER)); + attributes.put("hiveql.args.3.value", "84"); + + attributes.put("hiveql.args.4.type", String.valueOf(Types.INTEGER)); + attributes.put("hiveql.args.4.value", "1"); + + runner.enqueue(sql.getBytes(), attributes); + runner.run(); + + // should fail because there isn't a valid connection and tables don't exist. + runner.assertAllFlowFilesTransferred(PutHiveQL.REL_RETRY, 1); + } + + @Test + public void testUnknownFailureRollbackOnFailure() throws InitializationException, ProcessException { + final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class); + final SQLExceptionService service = new SQLExceptionService(null); + service.setErrorCode(0); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp"); + runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true"); + + final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + + "UPDATE PERSONS SET NAME='George' WHERE ID=?; "; + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("hiveql.args.1.value", "1"); + + attributes.put("hiveql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("hiveql.args.2.value", "Mark"); + + attributes.put("hiveql.args.3.type", String.valueOf(Types.INTEGER)); + attributes.put("hiveql.args.3.value", "84"); + + attributes.put("hiveql.args.4.type", String.valueOf(Types.INTEGER)); + attributes.put("hiveql.args.4.value", "1"); + + runner.enqueue(sql.getBytes(), attributes); + try { + runner.run(); + fail("Should throw ProcessException"); + } catch (AssertionError e) { + assertTrue(e.getCause() instanceof ProcessException); + } + + assertEquals(1, runner.getQueueSize().getObjectCount()); + runner.assertAllFlowFilesTransferred(PutHiveQL.REL_RETRY, 0); + } + /** * Simple implementation only for testing purposes */ @@ -758,6 +830,7 @@ public class TestPutHiveQL { private final HiveDBCPService service; private int allowedBeforeFailure = 0; private int successful = 0; + private int errorCode = 30000; // Default to a retryable exception code SQLExceptionService(final HiveDBCPService service) { this.service = service; @@ -773,8 +846,7 @@ public class TestPutHiveQL { try { if (++successful > allowedBeforeFailure) { final Connection conn = Mockito.mock(Connection.class); - // Throw a retryable error - Mockito.when(conn.prepareStatement(Mockito.any(String.class))).thenThrow(new SQLException("Unit Test Generated SQLException", "42000", 20000)); + Mockito.when(conn.prepareStatement(Mockito.any(String.class))).thenThrow(new SQLException("Unit Test Generated SQLException", "42000", errorCode)); return conn; } else { return service.getConnection(); @@ -789,5 +861,9 @@ public class TestPutHiveQL { public String getConnectionURL() { return service != null ? service.getConnectionURL() : null; } + + void setErrorCode(int errorCode) { + this.errorCode = errorCode; + } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/455e3c1b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3QL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3QL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3QL.java index 989d085..162d4fe 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3QL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3QL.java @@ -148,7 +148,24 @@ public class PutHive3QL extends AbstractHive3QLProcessor { if (e instanceof SQLNonTransientException) { return ErrorTypes.InvalidInput; } else if (e instanceof SQLException) { - return ErrorTypes.TemporalFailure; + // Use the SQLException's vendor code for guidance -- see Hive's ErrorMsg class for details on error codes + int errorCode = ((SQLException) e).getErrorCode(); + getLogger().debug("Error occurred during Hive operation, Hive returned error code {}", new Object[]{errorCode}); + if (errorCode >= 10000 && errorCode < 20000) { + return ErrorTypes.InvalidInput; + } else if (errorCode >= 20000 && errorCode < 30000) { + return ErrorTypes.InvalidInput; + } else if (errorCode >= 30000 && errorCode < 40000) { + return ErrorTypes.TemporalInputFailure; + } else if (errorCode >= 40000 && errorCode < 50000) { + // These are unknown errors (to include some parse errors), but rather than generating an UnknownFailure which causes + // a ProcessException, we'll route to failure via an InvalidInput error type. + return ErrorTypes.InvalidInput; + } else { + // Default unknown errors to TemporalFailure (as they were implemented originally), so they can be routed to failure + // or rolled back depending on the user's setting of Rollback On Failure. + return ErrorTypes.TemporalFailure; + } } else { return ErrorTypes.UnknownFailure; } http://git-wip-us.apache.org/repos/asf/nifi/blob/455e3c1b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3QL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3QL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3QL.java index 99b0b7d..fd571ab 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3QL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3QL.java @@ -719,6 +719,78 @@ public class TestPutHive3QL { runner.assertAllFlowFilesTransferred(PutHive3QL.REL_RETRY, 0); } + @Test + public void testUnknownFailure() throws InitializationException, ProcessException { + final TestRunner runner = TestRunners.newTestRunner(PutHive3QL.class); + final SQLExceptionService service = new SQLExceptionService(null); + service.setErrorCode(2); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + runner.setProperty(PutHive3QL.HIVE_DBCP_SERVICE, "dbcp"); + + final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + + "UPDATE PERSONS SET NAME='George' WHERE ID=?; "; + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("hiveql.args.1.value", "1"); + + attributes.put("hiveql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("hiveql.args.2.value", "Mark"); + + attributes.put("hiveql.args.3.type", String.valueOf(Types.INTEGER)); + attributes.put("hiveql.args.3.value", "84"); + + attributes.put("hiveql.args.4.type", String.valueOf(Types.INTEGER)); + attributes.put("hiveql.args.4.value", "1"); + + runner.enqueue(sql.getBytes(), attributes); + runner.run(); + + // should fail because there isn't a valid connection and tables don't exist. + runner.assertAllFlowFilesTransferred(PutHive3QL.REL_RETRY, 1); + } + + @Test + public void testUnknownFailureRollbackOnFailure() throws InitializationException, ProcessException { + final TestRunner runner = TestRunners.newTestRunner(PutHive3QL.class); + final SQLExceptionService service = new SQLExceptionService(null); + service.setErrorCode(0); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + runner.setProperty(PutHive3QL.HIVE_DBCP_SERVICE, "dbcp"); + runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true"); + + final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + + "UPDATE PERSONS SET NAME='George' WHERE ID=?; "; + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("hiveql.args.1.value", "1"); + + attributes.put("hiveql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("hiveql.args.2.value", "Mark"); + + attributes.put("hiveql.args.3.type", String.valueOf(Types.INTEGER)); + attributes.put("hiveql.args.3.value", "84"); + + attributes.put("hiveql.args.4.type", String.valueOf(Types.INTEGER)); + attributes.put("hiveql.args.4.value", "1"); + + runner.enqueue(sql.getBytes(), attributes); + try { + runner.run(); + fail("Should throw ProcessException"); + } catch (AssertionError e) { + assertTrue(e.getCause() instanceof ProcessException); + } + + assertEquals(1, runner.getQueueSize().getObjectCount()); + runner.assertAllFlowFilesTransferred(PutHive3QL.REL_RETRY, 0); + } + /** * Simple implementation only for testing purposes */ @@ -758,6 +830,7 @@ public class TestPutHive3QL { private final Hive3DBCPService service; private int allowedBeforeFailure = 0; private int successful = 0; + private int errorCode = 30000; // Default to a retryable exception code SQLExceptionService(final Hive3DBCPService service) { this.service = service; @@ -773,7 +846,7 @@ public class TestPutHive3QL { try { if (++successful > allowedBeforeFailure) { final Connection conn = Mockito.mock(Connection.class); - Mockito.when(conn.prepareStatement(Mockito.any(String.class))).thenThrow(new SQLException("Unit Test Generated SQLException")); + Mockito.when(conn.prepareStatement(Mockito.any(String.class))).thenThrow(new SQLException("Unit Test Generated SQLException", "42000", errorCode)); return conn; } else { return service.getConnection(); @@ -788,5 +861,9 @@ public class TestPutHive3QL { public String getConnectionURL() { return service != null ? service.getConnectionURL() : null; } + + void setErrorCode(int errorCode) { + this.errorCode = errorCode; + } } }
