Repository: nifi Updated Branches: refs/heads/master 099bfcdf3 -> 64356e001
NIFI-5049 Fix handling of Phonenix datetime columns This closes #2625 Signed-off-by: Mike Thomsen <mikerthom...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/64356e00 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/64356e00 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/64356e00 Branch: refs/heads/master Commit: 64356e001432b78d919604d13787ea7b40e80e8e Parents: 099bfcd Author: Gardella Juan Pablo <gardellajuanpa...@gmail.com> Authored: Wed Apr 11 01:23:00 2018 -0300 Committer: Mike Thomsen <mikerthom...@gmail.com> Committed: Fri May 11 09:51:24 2018 -0400 ---------------------------------------------------------------------- .../AbstractDatabaseFetchProcessor.java | 24 +++++-- .../processors/standard/QueryDatabaseTable.java | 7 +- .../db/impl/PhoenixDatabaseAdapter.java | 72 ++++++++++++++++++++ .../standard/QueryDatabaseTableTest.java | 38 +++++++++++ 4 files changed, 133 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/64356e00/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java index 0210739..924c7da 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java @@ -30,6 +30,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.standard.db.DatabaseAdapter; +import org.apache.nifi.processors.standard.db.impl.PhoenixDatabaseAdapter; import org.apache.nifi.util.StringUtils; import java.io.IOException; @@ -485,13 +486,26 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact case NVARCHAR: case VARCHAR: case ROWID: - case DATE: - case TIME: return "'" + value + "'"; + case TIME: + if (PhoenixDatabaseAdapter.NAME.equals(databaseType)) { + return "time '" + value + "'"; + } + case DATE: case TIMESTAMP: - if (!StringUtils.isEmpty(databaseType) && databaseType.contains("Oracle")) { - // For backwards compatibility, the type might be TIMESTAMP but the state value is in DATE format. This should be a one-time occurrence as the next maximum value - // should be stored as a full timestamp. Even so, check to see if the value is missing time-of-day information, and use the "date" coercion rather than the + // TODO delegate to database adapter the conversion instead of using if in this + // class. + // TODO (cont) if a new else is added, please refactor the code. + // Ideally we should probably have a method on the adapter to get a clause that + // coerces a + // column to a Timestamp if need be (the generic one can be a no-op) + if (!StringUtils.isEmpty(databaseType) + && (databaseType.contains("Oracle") || PhoenixDatabaseAdapter.NAME.equals(databaseType))) { + // For backwards compatibility, the type might be TIMESTAMP but the state value + // is in DATE format. This should be a one-time occurrence as the next maximum + // value + // should be stored as a full timestamp. Even so, check to see if the value is + // missing time-of-day information, and use the "date" coercion rather than the // "timestamp" coercion in that case if (value.matches("\\d{4}-\\d{2}-\\d{2}")) { return "date '" + value + "'"; http://git-wip-us.apache.org/repos/asf/nifi/blob/64356e00/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index 1e8750f..c12b3b9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -310,9 +310,10 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue(); st.setQueryTimeout(queryTimeout); // timeout in seconds - try { - logger.debug("Executing query {}", new Object[]{selectQuery}); - final ResultSet resultSet = st.executeQuery(selectQuery); + if (logger.isDebugEnabled()) { + logger.debug("Executing query {}", new Object[] { selectQuery }); + } + try (final ResultSet resultSet = st.executeQuery(selectQuery)) { int fragmentIndex=0; while(true) { final AtomicLong nrOfRows = new AtomicLong(0L); http://git-wip-us.apache.org/repos/asf/nifi/blob/64356e00/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PhoenixDatabaseAdapter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PhoenixDatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PhoenixDatabaseAdapter.java new file mode 100644 index 0000000..92522db --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PhoenixDatabaseAdapter.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.db.impl; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.processors.standard.db.DatabaseAdapter; + +/** + * A Apache Phoenix database adapter that generates ANSI SQL. + */ +public final class PhoenixDatabaseAdapter implements DatabaseAdapter { + public static final String NAME = "Phoenix"; + + @Override + public String getName() { + return NAME; + } + + @Override + public String getDescription() { + return "Generates Phoenix compliant SQL"; + } + + @Override + public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, + Long limit, Long offset) { + if (StringUtils.isEmpty(tableName)) { + throw new IllegalArgumentException("Table name cannot be null or empty"); + } + final StringBuilder query = new StringBuilder("SELECT "); + if (StringUtils.isEmpty(columnNames) || columnNames.trim().equals("*")) { + query.append("*"); + } else { + query.append(columnNames); + } + query.append(" FROM "); + query.append(tableName); + + if (!StringUtils.isEmpty(whereClause)) { + query.append(" WHERE "); + query.append(whereClause); + } + if (!StringUtils.isEmpty(orderByClause)) { + query.append(" ORDER BY "); + query.append(orderByClause); + } + if (limit != null) { + query.append(" LIMIT "); + query.append(limit); + } + if (offset != null && offset > 0) { + query.append(" OFFSET "); + query.append(offset); + } + + return query.toString(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/64356e00/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java index ff6a7f0..87bf2a2 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java @@ -32,6 +32,7 @@ import org.apache.nifi.processors.standard.db.impl.GenericDatabaseAdapter; import org.apache.nifi.processors.standard.db.impl.MSSQLDatabaseAdapter; import org.apache.nifi.processors.standard.db.impl.MySQLDatabaseAdapter; import org.apache.nifi.processors.standard.db.impl.OracleDatabaseAdapter; +import org.apache.nifi.processors.standard.db.impl.PhoenixDatabaseAdapter; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; @@ -180,6 +181,43 @@ public class QueryDatabaseTableTest { dbAdapter = new OracleDatabaseAdapter(); query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), "type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap()); assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= timestamp '2016-03-07 12:34:56' AND (type = \"CUSTOMER\")", query); + + // Test time. + processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "time_created", Types.TIME); + maxValues.clear(); + maxValues.put("id", "509"); + maxValues.put("time_created", "12:34:57"); + maxValues.put("date_created", "2016-03-07 12:34:56"); + stateManager = runner.getStateManager(); + stateManager.clear(Scope.CLUSTER); + stateManager.setState(maxValues, Scope.CLUSTER); + query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED", "TIME_CREATED"), "type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap()); + assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= timestamp '2016-03-07 12:34:56' AND TIME_CREATED >= timestamp '12:34:57' AND (type = \"CUSTOMER\")", query); + dbAdapter = new GenericDatabaseAdapter(); + query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED", "TIME_CREATED"), "type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap()); + assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= '2016-03-07 12:34:56' AND TIME_CREATED >= '12:34:57' AND (type = \"CUSTOMER\")", query); + } + + @Test + public void testGetQueryUsingPhoenixAdapter() throws Exception { + Map<String, String> maxValues = new HashMap<>(); + StateManager stateManager = runner.getStateManager(); + processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "id", Types.INTEGER); + processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "time_created", Types.TIME); + processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "date_created", Types.TIMESTAMP); + + maxValues.put("id", "509"); + maxValues.put("time_created", "12:34:57"); + maxValues.put("date_created", "2016-03-07 12:34:56"); + stateManager.setState(maxValues, Scope.CLUSTER); + + dbAdapter = new PhoenixDatabaseAdapter(); + String query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED", "TIME_CREATED"), "type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap()); + assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= timestamp '2016-03-07 12:34:56' AND TIME_CREATED >= time '12:34:57' AND (type = \"CUSTOMER\")", query); + // Cover the other path + dbAdapter = new GenericDatabaseAdapter(); + query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED", "TIME_CREATED"), "type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap()); + assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= '2016-03-07 12:34:56' AND TIME_CREATED >= '12:34:57' AND (type = \"CUSTOMER\")", query); } @Test(expected = IllegalArgumentException.class)