Repository: nifi
Updated Branches:
  refs/heads/master 099bfcdf3 -> 64356e001


NIFI-5049 Fix handling of Phonenix datetime columns

This closes #2625

Signed-off-by: Mike Thomsen <mikerthom...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/64356e00
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/64356e00
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/64356e00

Branch: refs/heads/master
Commit: 64356e001432b78d919604d13787ea7b40e80e8e
Parents: 099bfcd
Author: Gardella Juan Pablo <gardellajuanpa...@gmail.com>
Authored: Wed Apr 11 01:23:00 2018 -0300
Committer: Mike Thomsen <mikerthom...@gmail.com>
Committed: Fri May 11 09:51:24 2018 -0400

----------------------------------------------------------------------
 .../AbstractDatabaseFetchProcessor.java         | 24 +++++--
 .../processors/standard/QueryDatabaseTable.java |  7 +-
 .../db/impl/PhoenixDatabaseAdapter.java         | 72 ++++++++++++++++++++
 .../standard/QueryDatabaseTableTest.java        | 38 +++++++++++
 4 files changed, 133 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/64356e00/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
index 0210739..924c7da 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
@@ -30,6 +30,7 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+import org.apache.nifi.processors.standard.db.impl.PhoenixDatabaseAdapter;
 import org.apache.nifi.util.StringUtils;
 
 import java.io.IOException;
@@ -485,13 +486,26 @@ public abstract class AbstractDatabaseFetchProcessor 
extends AbstractSessionFact
             case NVARCHAR:
             case VARCHAR:
             case ROWID:
-            case DATE:
-            case TIME:
                 return "'" + value + "'";
+            case TIME:
+                if (PhoenixDatabaseAdapter.NAME.equals(databaseType)) {
+                    return "time '" + value + "'";
+                }
+            case DATE:
             case TIMESTAMP:
-                if (!StringUtils.isEmpty(databaseType) && 
databaseType.contains("Oracle")) {
-                    // For backwards compatibility, the type might be 
TIMESTAMP but the state value is in DATE format. This should be a one-time 
occurrence as the next maximum value
-                    // should be stored as a full timestamp. Even so, check to 
see if the value is missing time-of-day information, and use the "date" 
coercion rather than the
+                // TODO delegate to database adapter the conversion instead of 
using if in this
+                // class.
+                // TODO (cont) if a new else is added, please refactor the 
code.
+                // Ideally we should probably have a method on the adapter to 
get a clause that
+                // coerces a
+                // column to a Timestamp if need be (the generic one can be a 
no-op)
+                if (!StringUtils.isEmpty(databaseType)
+                        && (databaseType.contains("Oracle") || 
PhoenixDatabaseAdapter.NAME.equals(databaseType))) {
+                    // For backwards compatibility, the type might be 
TIMESTAMP but the state value
+                    // is in DATE format. This should be a one-time occurrence 
as the next maximum
+                    // value
+                    // should be stored as a full timestamp. Even so, check to 
see if the value is
+                    // missing time-of-day information, and use the "date" 
coercion rather than the
                     // "timestamp" coercion in that case
                     if (value.matches("\\d{4}-\\d{2}-\\d{2}")) {
                         return "date '" + value + "'";

http://git-wip-us.apache.org/repos/asf/nifi/blob/64356e00/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index 1e8750f..c12b3b9 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -310,9 +310,10 @@ public class QueryDatabaseTable extends 
AbstractDatabaseFetchProcessor {
 
             final Integer queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
             st.setQueryTimeout(queryTimeout); // timeout in seconds
-            try {
-                logger.debug("Executing query {}", new Object[]{selectQuery});
-                final ResultSet resultSet = st.executeQuery(selectQuery);
+            if (logger.isDebugEnabled()) {
+                logger.debug("Executing query {}", new Object[] { selectQuery 
});
+            }
+            try (final ResultSet resultSet = st.executeQuery(selectQuery)) {
                 int fragmentIndex=0;
                 while(true) {
                     final AtomicLong nrOfRows = new AtomicLong(0L);

http://git-wip-us.apache.org/repos/asf/nifi/blob/64356e00/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PhoenixDatabaseAdapter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PhoenixDatabaseAdapter.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PhoenixDatabaseAdapter.java
new file mode 100644
index 0000000..92522db
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PhoenixDatabaseAdapter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.db.impl;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+
+/**
+ * A Apache Phoenix database adapter that generates ANSI SQL.
+ */
+public final class PhoenixDatabaseAdapter implements DatabaseAdapter {
+    public static final String NAME = "Phoenix";
+
+    @Override
+    public String getName() {
+        return NAME;
+    }
+
+    @Override
+    public String getDescription() {
+        return "Generates Phoenix compliant SQL";
+    }
+
+    @Override
+    public String getSelectStatement(String tableName, String columnNames, 
String whereClause, String orderByClause,
+            Long limit, Long offset) {
+        if (StringUtils.isEmpty(tableName)) {
+            throw new IllegalArgumentException("Table name cannot be null or 
empty");
+        }
+        final StringBuilder query = new StringBuilder("SELECT ");
+        if (StringUtils.isEmpty(columnNames) || 
columnNames.trim().equals("*")) {
+            query.append("*");
+        } else {
+            query.append(columnNames);
+        }
+        query.append(" FROM ");
+        query.append(tableName);
+
+        if (!StringUtils.isEmpty(whereClause)) {
+            query.append(" WHERE ");
+            query.append(whereClause);
+        }
+        if (!StringUtils.isEmpty(orderByClause)) {
+            query.append(" ORDER BY ");
+            query.append(orderByClause);
+        }
+        if (limit != null) {
+            query.append(" LIMIT ");
+            query.append(limit);
+        }
+        if (offset != null && offset > 0) {
+            query.append(" OFFSET ");
+            query.append(offset);
+        }
+
+        return query.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/64356e00/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index ff6a7f0..87bf2a2 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -32,6 +32,7 @@ import 
org.apache.nifi.processors.standard.db.impl.GenericDatabaseAdapter;
 import org.apache.nifi.processors.standard.db.impl.MSSQLDatabaseAdapter;
 import org.apache.nifi.processors.standard.db.impl.MySQLDatabaseAdapter;
 import org.apache.nifi.processors.standard.db.impl.OracleDatabaseAdapter;
+import org.apache.nifi.processors.standard.db.impl.PhoenixDatabaseAdapter;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
@@ -180,6 +181,43 @@ public class QueryDatabaseTableTest {
         dbAdapter = new OracleDatabaseAdapter();
         query = processor.getQuery(dbAdapter, "myTable", null, 
Arrays.asList("id", "DATE_CREATED"), "type = \"CUSTOMER\"", 
stateManager.getState(Scope.CLUSTER).toMap());
         assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= 
timestamp '2016-03-07 12:34:56' AND (type = \"CUSTOMER\")", query);
+
+        // Test time.
+        processor.putColumnType("mytable" + 
AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "time_created", 
Types.TIME);
+        maxValues.clear();
+        maxValues.put("id", "509");
+        maxValues.put("time_created", "12:34:57");
+        maxValues.put("date_created", "2016-03-07 12:34:56");
+        stateManager = runner.getStateManager();
+        stateManager.clear(Scope.CLUSTER);
+        stateManager.setState(maxValues, Scope.CLUSTER);
+        query = processor.getQuery(dbAdapter, "myTable", null, 
Arrays.asList("id", "DATE_CREATED", "TIME_CREATED"), "type = \"CUSTOMER\"", 
stateManager.getState(Scope.CLUSTER).toMap());
+        assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= 
timestamp '2016-03-07 12:34:56' AND TIME_CREATED >= timestamp '12:34:57' AND 
(type = \"CUSTOMER\")", query);
+        dbAdapter = new GenericDatabaseAdapter();
+        query = processor.getQuery(dbAdapter, "myTable", null, 
Arrays.asList("id", "DATE_CREATED", "TIME_CREATED"), "type = \"CUSTOMER\"", 
stateManager.getState(Scope.CLUSTER).toMap());
+        assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= 
'2016-03-07 12:34:56' AND TIME_CREATED >= '12:34:57' AND (type = 
\"CUSTOMER\")", query);
+    }
+
+    @Test
+    public void testGetQueryUsingPhoenixAdapter() throws Exception {
+        Map<String, String> maxValues = new HashMap<>();
+        StateManager stateManager = runner.getStateManager();
+        processor.putColumnType("mytable" + 
AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "id", Types.INTEGER);
+        processor.putColumnType("mytable" + 
AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "time_created", 
Types.TIME);
+        processor.putColumnType("mytable" + 
AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "date_created", 
Types.TIMESTAMP);
+
+        maxValues.put("id", "509");
+        maxValues.put("time_created", "12:34:57");
+        maxValues.put("date_created", "2016-03-07 12:34:56");
+        stateManager.setState(maxValues, Scope.CLUSTER);
+
+        dbAdapter = new PhoenixDatabaseAdapter();
+        String query = processor.getQuery(dbAdapter, "myTable", null, 
Arrays.asList("id", "DATE_CREATED", "TIME_CREATED"), "type = \"CUSTOMER\"", 
stateManager.getState(Scope.CLUSTER).toMap());
+        assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= 
timestamp '2016-03-07 12:34:56' AND TIME_CREATED >= time '12:34:57' AND (type = 
\"CUSTOMER\")", query);
+        // Cover the other path
+        dbAdapter = new GenericDatabaseAdapter();
+        query = processor.getQuery(dbAdapter, "myTable", null, 
Arrays.asList("id", "DATE_CREATED", "TIME_CREATED"), "type = \"CUSTOMER\"", 
stateManager.getState(Scope.CLUSTER).toMap());
+        assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= 
'2016-03-07 12:34:56' AND TIME_CREATED >= '12:34:57' AND (type = 
\"CUSTOMER\")", query);
     }
 
     @Test(expected = IllegalArgumentException.class)

Reply via email to