Repository: incubator-impala
Updated Branches:
  refs/heads/master d04f96b99 -> 24c77f194


IMPALA-5137: Support pushing TIMESTAMP predicates to Kudu

This change builds on the support for reading and writing
TIMESTAMP columns to Kudu tables (see [1]), adding support
for pushing TIMESTAMP predicates to Kudu for scans.

Binary predicates and IN list predicates are supported.

Testing: Added some planner and EE tests to validate the
behavior.

1: https://gerrit.cloudera.org/#/c/6526/

Change-Id: I08b6c8354a408e7beb94c1a135c23722977246ea
Reviewed-on: http://gerrit.cloudera.org:8080/6789
Reviewed-by: Matthew Jacobs <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/24c77f19
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/24c77f19
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/24c77f19

Branch: refs/heads/master
Commit: 24c77f194b68f7ac5dd3af56b834be623949e280
Parents: d04f96b
Author: Matthew Jacobs <[email protected]>
Authored: Wed May 3 10:49:54 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Thu May 18 21:09:51 2017 +0000

----------------------------------------------------------------------
 be/src/exprs/timestamp-functions-ir.cc          |  8 +++
 be/src/exprs/timestamp-functions.h              | 16 +++++-
 common/function-registry/impala_functions.py    |  2 +
 .../org/apache/impala/planner/KuduScanNode.java | 26 +++++++--
 .../java/org/apache/impala/util/KuduUtil.java   | 21 ++++++++
 .../queries/PlannerTest/kudu-selectivity.test   | 35 ++++++++++--
 .../queries/QueryTest/kudu-scan-node.test       | 57 +++++++++++++++++++-
 7 files changed, 157 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24c77f19/be/src/exprs/timestamp-functions-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/timestamp-functions-ir.cc 
b/be/src/exprs/timestamp-functions-ir.cc
index 6a80e71..7c6f8d5 100644
--- a/be/src/exprs/timestamp-functions-ir.cc
+++ b/be/src/exprs/timestamp-functions-ir.cc
@@ -120,6 +120,14 @@ BigIntVal TimestampFunctions::Unix(FunctionContext* 
context) {
   }
 }
 
+BigIntVal TimestampFunctions::UtcToUnixMicros(FunctionContext* context,
+    const TimestampVal& ts_val) {
+  if (ts_val.is_null) return BigIntVal::null();
+  const TimestampValue& tv = TimestampValue::FromTimestampVal(ts_val);
+  int64_t result;
+  return (tv.UtcToUnixTimeMicros(&result)) ? BigIntVal(result) : 
BigIntVal::null();
+}
+
 TimestampVal TimestampFunctions::ToTimestamp(FunctionContext* context,
     const BigIntVal& bigint_val) {
   if (bigint_val.is_null) return TimestampVal::null();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24c77f19/be/src/exprs/timestamp-functions.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/timestamp-functions.h 
b/be/src/exprs/timestamp-functions.h
index d703da6..b4c685b 100644
--- a/be/src/exprs/timestamp-functions.h
+++ b/be/src/exprs/timestamp-functions.h
@@ -61,13 +61,27 @@ class TimestampFunctions {
       FunctionContext::FunctionStateScope scope);
 
   /// Parses 'string_val' based on the format 'fmt'.
+  /// The time zone interpretation of the parsed timestamp is determined by
+  /// FLAGS_use_local_tz_for_unix_timestamp_conversions. If the flag is true, 
the
+  /// instance is interpreted as a local value. If the flag is false, UTC is 
assumed.
   static BigIntVal Unix(FunctionContext* context, const StringVal& string_val,
       const StringVal& fmt);
-  /// Converts 'tv_val' to a unix time_t
+
+  /// Converts 'tv_val' to a unix time_t.
+  /// The time zone interpretation of the specified timestamp is determined by
+  /// FLAGS_use_local_tz_for_unix_timestamp_conversions. If the flag is true, 
the
+  /// instance is interpreted as a local value. If the flag is false, UTC is 
assumed.
   static BigIntVal Unix(FunctionContext* context, const TimestampVal& tv_val);
+
   /// Returns the current time.
+  /// The time zone interpretation of the current time is determined by
+  /// FLAGS_use_local_tz_for_unix_timestamp_conversions. If the flag is true, 
the
+  /// instance is interpreted as a local value. If the flag is false, UTC is 
assumed.
   static BigIntVal Unix(FunctionContext* context);
 
+  /// Interpret 'tv_val' as a timestamp in UTC and convert to unix time in 
microseconds.
+  static BigIntVal UtcToUnixMicros(FunctionContext* context, const 
TimestampVal& tv_val);
+
   // Functions to convert to and from TimestampVal type
   static TimestampVal ToTimestamp(FunctionContext* context, const BigIntVal& 
bigint_val);
   static TimestampVal ToTimestamp(FunctionContext* context, const StringVal& 
date,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24c77f19/common/function-registry/impala_functions.py
----------------------------------------------------------------------
diff --git a/common/function-registry/impala_functions.py 
b/common/function-registry/impala_functions.py
index fbe6719..296d705 100644
--- a/common/function-registry/impala_functions.py
+++ b/common/function-registry/impala_functions.py
@@ -209,6 +209,8 @@ visible_functions = [
   [['unix_timestamp'], 'BIGINT', ['STRING', 'STRING'], 
'_ZN6impala18TimestampFunctions4UnixEPN10impala_udf15FunctionContextERKNS1_9StringValES6_',
           
'_ZN6impala18TimestampFunctions22UnixAndFromUnixPrepareEPN10impala_udf15FunctionContextENS2_18FunctionStateScopeE',
           
'_ZN6impala18TimestampFunctions20UnixAndFromUnixCloseEPN10impala_udf15FunctionContextENS2_18FunctionStateScopeE'],
+  [['utc_to_unix_micros'], 'BIGINT', ['TIMESTAMP'],
+    
'_ZN6impala18TimestampFunctions15UtcToUnixMicrosEPN10impala_udf15FunctionContextERKNS1_12TimestampValE'],
   [['from_unixtime'], 'STRING', ['INT'],
       
'_ZN6impala18TimestampFunctions8FromUnixIN10impala_udf6IntValEEENS2_9StringValEPNS2_15FunctionContextERKT_'],
   [['from_unixtime'], 'STRING', ['INT', 'STRING'],

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24c77f19/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index bb4181e..4156a72 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -37,7 +37,9 @@ import org.apache.impala.analysis.StringLiteral;
 import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.Type;
+import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.common.InternalException;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TKuduScanNode;
 import org.apache.impala.thrift.TNetworkAddress;
@@ -388,6 +390,16 @@ public class KuduScanNode extends ScanNode {
             ((StringLiteral)literal).getStringValue());
         break;
       }
+      case TIMESTAMP: {
+        try {
+          kuduPredicate = KuduPredicate.newComparisonPredicate(column, op,
+              KuduUtil.timestampToUnixTimeMicros(analyzer, literal));
+        } catch (Exception e) {
+          LOG.info("Exception converting Kudu timestamp predicate: " + 
expr.toSql(), e);
+          return false;
+        }
+        break;
+      }
       default: break;
     }
     if (kuduPredicate == null) return false;
@@ -422,8 +434,8 @@ public class KuduScanNode extends ScanNode {
       // Cannot push predicates with null literal values (KUDU-1595).
       if (literal instanceof NullLiteral) return false;
 
-      Object value = getKuduInListValue(literal);
-      Preconditions.checkNotNull(value);
+      Object value = getKuduInListValue(analyzer, literal);
+      if (value == null) return false;
       values.add(value);
     }
 
@@ -468,7 +480,7 @@ public class KuduScanNode extends ScanNode {
    * added to a KuduPredicate. If the Expr is not supported by Kudu or the 
type doesn't
    * match the expected PrimitiveType 'type', null is returned.
    */
-  private static Object getKuduInListValue(LiteralExpr e) {
+  private static Object getKuduInListValue(Analyzer analyzer, LiteralExpr e) {
     switch (e.getType().getPrimitiveType()) {
       case BOOLEAN: return ((BoolLiteral) e).getValue();
       case TINYINT: return (byte) ((NumericLiteral) e).getLongValue();
@@ -478,6 +490,14 @@ public class KuduScanNode extends ScanNode {
       case FLOAT: return (float) ((NumericLiteral) e).getDoubleValue();
       case DOUBLE: return ((NumericLiteral) e).getDoubleValue();
       case STRING: return ((StringLiteral) e).getValue();
+      case TIMESTAMP: {
+        try {
+          return KuduUtil.timestampToUnixTimeMicros(analyzer, e);
+        } catch (Exception ex) {
+          LOG.info("Exception converting Kudu timestamp expr: " + e.toSql(), 
ex);
+        }
+        break;
+      }
       default:
         Preconditions.checkState(false,
             "Unsupported Kudu type considered for predicate: %s", 
e.getType().toSql());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24c77f19/fe/src/main/java/org/apache/impala/util/KuduUtil.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/KuduUtil.java 
b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
index 4aa1b2a..9fc13b7 100644
--- a/fe/src/main/java/org/apache/impala/util/KuduUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
@@ -22,15 +22,22 @@ import static java.lang.String.format;
 import java.util.HashSet;
 import java.util.List;
 
+import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.Expr;
+import org.apache.impala.analysis.FunctionCallExpr;
 import org.apache.impala.analysis.LiteralExpr;
+import org.apache.impala.analysis.NumericLiteral;
 import org.apache.impala.catalog.ScalarType;
 import org.apache.impala.catalog.Type;
+import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.common.InternalException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.service.BackendConfig;
+import org.apache.impala.service.FeSupport;
 import org.apache.impala.thrift.TColumn;
 import org.apache.impala.thrift.TColumnEncoding;
+import org.apache.impala.thrift.TColumnValue;
 import org.apache.impala.thrift.TExpr;
 import org.apache.impala.thrift.TExprNode;
 import org.apache.impala.thrift.TExprNodeType;
@@ -189,6 +196,20 @@ public class KuduUtil {
     }
   }
 
+  public static Long timestampToUnixTimeMicros(Analyzer analyzer, Expr 
timestampExpr)
+      throws AnalysisException, InternalException {
+    Preconditions.checkArgument(timestampExpr.isAnalyzed());
+    Preconditions.checkArgument(timestampExpr.isConstant());
+    Preconditions.checkArgument(timestampExpr.getType() == Type.TIMESTAMP);
+    Expr toUnixTimeExpr = new FunctionCallExpr("utc_to_unix_micros",
+        Lists.newArrayList(timestampExpr));
+    toUnixTimeExpr.analyze(analyzer);
+    TColumnValue result = FeSupport.EvalExprWithoutRow(toUnixTimeExpr,
+        analyzer.getQueryCtx());
+    Preconditions.checkArgument(result.isSetLong_val());
+    return result.getLong_val();
+  }
+
   public static Encoding fromThrift(TColumnEncoding encoding)
       throws ImpalaRuntimeException {
     switch (encoding) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24c77f19/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test
index bad3299..2ab2ba9 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test
@@ -146,15 +146,44 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 select * from functional_kudu.alltypes where
 tinyint_col is not null and
 smallint_col is null and
-cast(date_string_col as tinyint) is null
+cast(date_string_col as tinyint) is null and
+timestamp_col > (nanoseconds_add(cast('1987-05-19 00:00:00' as timestamp), 
10)) and
+timestamp_col < (seconds_add(cast('9999-12-31 24:59:59' as timestamp), 10))
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
   PLAN-ROOT SINK
   |  mem-estimate=0B mem-reservation=0B
   |
   00:SCAN KUDU [functional_kudu.alltypes]
-     predicates: CAST(date_string_col AS TINYINT) IS NULL
-     kudu predicates: smallint_col IS NULL, tinyint_col IS NOT NULL
+     predicates: CAST(date_string_col AS TINYINT) IS NULL, timestamp_col < NULL
+     kudu predicates: smallint_col IS NULL, tinyint_col IS NOT NULL, 
timestamp_col > TIMESTAMP '1987-05-19 00:00:00.000000010'
      mem-estimate=0B mem-reservation=0B
      tuple-ids=0 row-size=126B cardinality=730
 ====
+select * from functional_kudu.alltypes where
+timestamp_col in (cast('2010-03-01 00:00:00' as timestamp),
+                  cast('2010-03-01 00:01:00' as timestamp))
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+  PLAN-ROOT SINK
+  |  mem-estimate=0B mem-reservation=0B
+  |
+  00:SCAN KUDU [functional_kudu.alltypes]
+     kudu predicates: timestamp_col IN (TIMESTAMP '2010-03-01 00:00:00', 
TIMESTAMP '2010-03-01 00:01:00')
+     mem-estimate=0B mem-reservation=0B
+     tuple-ids=0 row-size=97B cardinality=1
+====
+select * from functional_kudu.alltypes where
+timestamp_col in (cast('2010-03-01 00:00:00' as timestamp),
+                  null,
+                  cast('2010-03-01 00:01:00' as timestamp))
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+  PLAN-ROOT SINK
+  |  mem-estimate=0B mem-reservation=0B
+  |
+  00:SCAN KUDU [functional_kudu.alltypes]
+     predicates: timestamp_col IN (TIMESTAMP '2010-03-01 00:00:00', NULL, 
TIMESTAMP '2010-03-01 00:01:00')
+     mem-estimate=0B mem-reservation=0B
+     tuple-ids=0 row-size=97B cardinality=3
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24c77f19/testdata/workloads/functional-query/queries/QueryTest/kudu-scan-node.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/kudu-scan-node.test 
b/testdata/workloads/functional-query/queries/QueryTest/kudu-scan-node.test
index 243b7e5..115affa 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu-scan-node.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu-scan-node.test
@@ -84,4 +84,59 @@ select count(*) from functional_kudu.alltypes where id is 
null;
 0
 ---- TYPES
 BIGINT
-====
\ No newline at end of file
+====
+---- QUERY
+# Push down TIMESTAMP binary predicates
+select id, timestamp_col from functional_kudu.alltypes where
+timestamp_col <= cast('2009-01-01 00:08:00.28' as timestamp) and
+timestamp_col >= cast('2009-01-01 00:04:00.6' as timestamp)
+order by id;
+---- RESULTS
+4,2009-01-01 00:04:00.600000000
+5,2009-01-01 00:05:00.100000000
+6,2009-01-01 00:06:00.150000000
+7,2009-01-01 00:07:00.210000000
+8,2009-01-01 00:08:00.280000000
+---- TYPES
+INT, TIMESTAMP
+====
+---- QUERY
+# Out-of-range TIMESTAMP predicate (evaluates to NULL)
+select id, timestamp_col from functional_kudu.alltypes where
+timestamp_col > cast('1000-01-01 00:00:00.00' as timestamp)
+---- RESULTS
+---- TYPES
+INT, TIMESTAMP
+====
+---- QUERY
+select id, timestamp_col from functional_kudu.alltypes where
+timestamp_col < cast('2009-01-01 00:08:00.28' as timestamp) and
+timestamp_col > cast('2009-01-01 00:04:00.6' as timestamp)
+order by id;
+---- RESULTS
+5,2009-01-01 00:05:00.100000000
+6,2009-01-01 00:06:00.150000000
+7,2009-01-01 00:07:00.210000000
+---- TYPES
+INT, TIMESTAMP
+====
+---- QUERY
+select id, timestamp_col from functional_kudu.alltypes where
+timestamp_col = cast('2009-01-01 00:08:00.28' as timestamp);
+---- RESULTS
+8,2009-01-01 00:08:00.280000000
+---- TYPES
+INT, TIMESTAMP
+====
+---- QUERY
+# Push down TIMESTAMP IN list predicates
+select id, timestamp_col from functional_kudu.alltypes where
+timestamp_col in (cast('2010-03-01 00:00:00' as timestamp),
+                  cast('2010-03-01 00:01:00' as timestamp))
+order by id;
+---- RESULTS
+4240,2010-03-01 00:00:00
+4241,2010-03-01 00:01:00
+---- TYPES
+INT, TIMESTAMP
+====

Reply via email to