Repository: incubator-impala Updated Branches: refs/heads/master d04f96b99 -> 24c77f194
IMPALA-5137: Support pushing TIMESTAMP predicates to Kudu This change builds on the support for reading and writing TIMESTAMP columns to Kudu tables (see [1]), adding support for pushing TIMESTAMP predicates to Kudu for scans. Binary predicates and IN list predicates are supported. Testing: Added some planner and EE tests to validate the behavior. 1: https://gerrit.cloudera.org/#/c/6526/ Change-Id: I08b6c8354a408e7beb94c1a135c23722977246ea Reviewed-on: http://gerrit.cloudera.org:8080/6789 Reviewed-by: Matthew Jacobs <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/24c77f19 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/24c77f19 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/24c77f19 Branch: refs/heads/master Commit: 24c77f194b68f7ac5dd3af56b834be623949e280 Parents: d04f96b Author: Matthew Jacobs <[email protected]> Authored: Wed May 3 10:49:54 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Thu May 18 21:09:51 2017 +0000 ---------------------------------------------------------------------- be/src/exprs/timestamp-functions-ir.cc | 8 +++ be/src/exprs/timestamp-functions.h | 16 +++++- common/function-registry/impala_functions.py | 2 + .../org/apache/impala/planner/KuduScanNode.java | 26 +++++++-- .../java/org/apache/impala/util/KuduUtil.java | 21 ++++++++ .../queries/PlannerTest/kudu-selectivity.test | 35 ++++++++++-- .../queries/QueryTest/kudu-scan-node.test | 57 +++++++++++++++++++- 7 files changed, 157 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24c77f19/be/src/exprs/timestamp-functions-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/timestamp-functions-ir.cc b/be/src/exprs/timestamp-functions-ir.cc index 6a80e71..7c6f8d5 100644 --- a/be/src/exprs/timestamp-functions-ir.cc +++ b/be/src/exprs/timestamp-functions-ir.cc @@ -120,6 +120,14 @@ BigIntVal TimestampFunctions::Unix(FunctionContext* context) { } } +BigIntVal TimestampFunctions::UtcToUnixMicros(FunctionContext* context, + const TimestampVal& ts_val) { + if (ts_val.is_null) return BigIntVal::null(); + const TimestampValue& tv = TimestampValue::FromTimestampVal(ts_val); + int64_t result; + return (tv.UtcToUnixTimeMicros(&result)) ? BigIntVal(result) : BigIntVal::null(); +} + TimestampVal TimestampFunctions::ToTimestamp(FunctionContext* context, const BigIntVal& bigint_val) { if (bigint_val.is_null) return TimestampVal::null(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24c77f19/be/src/exprs/timestamp-functions.h ---------------------------------------------------------------------- diff --git a/be/src/exprs/timestamp-functions.h b/be/src/exprs/timestamp-functions.h index d703da6..b4c685b 100644 --- a/be/src/exprs/timestamp-functions.h +++ b/be/src/exprs/timestamp-functions.h @@ -61,13 +61,27 @@ class TimestampFunctions { FunctionContext::FunctionStateScope scope); /// Parses 'string_val' based on the format 'fmt'. + /// The time zone interpretation of the parsed timestamp is determined by + /// FLAGS_use_local_tz_for_unix_timestamp_conversions. If the flag is true, the + /// instance is interpreted as a local value. If the flag is false, UTC is assumed. static BigIntVal Unix(FunctionContext* context, const StringVal& string_val, const StringVal& fmt); - /// Converts 'tv_val' to a unix time_t + + /// Converts 'tv_val' to a unix time_t. + /// The time zone interpretation of the specified timestamp is determined by + /// FLAGS_use_local_tz_for_unix_timestamp_conversions. If the flag is true, the + /// instance is interpreted as a local value. If the flag is false, UTC is assumed. static BigIntVal Unix(FunctionContext* context, const TimestampVal& tv_val); + /// Returns the current time. + /// The time zone interpretation of the current time is determined by + /// FLAGS_use_local_tz_for_unix_timestamp_conversions. If the flag is true, the + /// instance is interpreted as a local value. If the flag is false, UTC is assumed. static BigIntVal Unix(FunctionContext* context); + /// Interpret 'tv_val' as a timestamp in UTC and convert to unix time in microseconds. + static BigIntVal UtcToUnixMicros(FunctionContext* context, const TimestampVal& tv_val); + // Functions to convert to and from TimestampVal type static TimestampVal ToTimestamp(FunctionContext* context, const BigIntVal& bigint_val); static TimestampVal ToTimestamp(FunctionContext* context, const StringVal& date, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24c77f19/common/function-registry/impala_functions.py ---------------------------------------------------------------------- diff --git a/common/function-registry/impala_functions.py b/common/function-registry/impala_functions.py index fbe6719..296d705 100644 --- a/common/function-registry/impala_functions.py +++ b/common/function-registry/impala_functions.py @@ -209,6 +209,8 @@ visible_functions = [ [['unix_timestamp'], 'BIGINT', ['STRING', 'STRING'], '_ZN6impala18TimestampFunctions4UnixEPN10impala_udf15FunctionContextERKNS1_9StringValES6_', '_ZN6impala18TimestampFunctions22UnixAndFromUnixPrepareEPN10impala_udf15FunctionContextENS2_18FunctionStateScopeE', '_ZN6impala18TimestampFunctions20UnixAndFromUnixCloseEPN10impala_udf15FunctionContextENS2_18FunctionStateScopeE'], + [['utc_to_unix_micros'], 'BIGINT', ['TIMESTAMP'], + '_ZN6impala18TimestampFunctions15UtcToUnixMicrosEPN10impala_udf15FunctionContextERKNS1_12TimestampValE'], [['from_unixtime'], 'STRING', ['INT'], '_ZN6impala18TimestampFunctions8FromUnixIN10impala_udf6IntValEEENS2_9StringValEPNS2_15FunctionContextERKT_'], [['from_unixtime'], 'STRING', ['INT', 'STRING'], http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24c77f19/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java index bb4181e..4156a72 100644 --- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java @@ -37,7 +37,9 @@ import org.apache.impala.analysis.StringLiteral; import org.apache.impala.analysis.TupleDescriptor; import org.apache.impala.catalog.KuduTable; import org.apache.impala.catalog.Type; +import org.apache.impala.common.AnalysisException; import org.apache.impala.common.ImpalaRuntimeException; +import org.apache.impala.common.InternalException; import org.apache.impala.thrift.TExplainLevel; import org.apache.impala.thrift.TKuduScanNode; import org.apache.impala.thrift.TNetworkAddress; @@ -388,6 +390,16 @@ public class KuduScanNode extends ScanNode { ((StringLiteral)literal).getStringValue()); break; } + case TIMESTAMP: { + try { + kuduPredicate = KuduPredicate.newComparisonPredicate(column, op, + KuduUtil.timestampToUnixTimeMicros(analyzer, literal)); + } catch (Exception e) { + LOG.info("Exception converting Kudu timestamp predicate: " + expr.toSql(), e); + return false; + } + break; + } default: break; } if (kuduPredicate == null) return false; @@ -422,8 +434,8 @@ public class KuduScanNode extends ScanNode { // Cannot push predicates with null literal values (KUDU-1595). if (literal instanceof NullLiteral) return false; - Object value = getKuduInListValue(literal); - Preconditions.checkNotNull(value); + Object value = getKuduInListValue(analyzer, literal); + if (value == null) return false; values.add(value); } @@ -468,7 +480,7 @@ public class KuduScanNode extends ScanNode { * added to a KuduPredicate. If the Expr is not supported by Kudu or the type doesn't * match the expected PrimitiveType 'type', null is returned. */ - private static Object getKuduInListValue(LiteralExpr e) { + private static Object getKuduInListValue(Analyzer analyzer, LiteralExpr e) { switch (e.getType().getPrimitiveType()) { case BOOLEAN: return ((BoolLiteral) e).getValue(); case TINYINT: return (byte) ((NumericLiteral) e).getLongValue(); @@ -478,6 +490,14 @@ public class KuduScanNode extends ScanNode { case FLOAT: return (float) ((NumericLiteral) e).getDoubleValue(); case DOUBLE: return ((NumericLiteral) e).getDoubleValue(); case STRING: return ((StringLiteral) e).getValue(); + case TIMESTAMP: { + try { + return KuduUtil.timestampToUnixTimeMicros(analyzer, e); + } catch (Exception ex) { + LOG.info("Exception converting Kudu timestamp expr: " + e.toSql(), ex); + } + break; + } default: Preconditions.checkState(false, "Unsupported Kudu type considered for predicate: %s", e.getType().toSql()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24c77f19/fe/src/main/java/org/apache/impala/util/KuduUtil.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/util/KuduUtil.java b/fe/src/main/java/org/apache/impala/util/KuduUtil.java index 4aa1b2a..9fc13b7 100644 --- a/fe/src/main/java/org/apache/impala/util/KuduUtil.java +++ b/fe/src/main/java/org/apache/impala/util/KuduUtil.java @@ -22,15 +22,22 @@ import static java.lang.String.format; import java.util.HashSet; import java.util.List; +import org.apache.impala.analysis.Analyzer; import org.apache.impala.analysis.Expr; +import org.apache.impala.analysis.FunctionCallExpr; import org.apache.impala.analysis.LiteralExpr; +import org.apache.impala.analysis.NumericLiteral; import org.apache.impala.catalog.ScalarType; import org.apache.impala.catalog.Type; +import org.apache.impala.common.AnalysisException; import org.apache.impala.common.ImpalaRuntimeException; +import org.apache.impala.common.InternalException; import org.apache.impala.common.Pair; import org.apache.impala.service.BackendConfig; +import org.apache.impala.service.FeSupport; import org.apache.impala.thrift.TColumn; import org.apache.impala.thrift.TColumnEncoding; +import org.apache.impala.thrift.TColumnValue; import org.apache.impala.thrift.TExpr; import org.apache.impala.thrift.TExprNode; import org.apache.impala.thrift.TExprNodeType; @@ -189,6 +196,20 @@ public class KuduUtil { } } + public static Long timestampToUnixTimeMicros(Analyzer analyzer, Expr timestampExpr) + throws AnalysisException, InternalException { + Preconditions.checkArgument(timestampExpr.isAnalyzed()); + Preconditions.checkArgument(timestampExpr.isConstant()); + Preconditions.checkArgument(timestampExpr.getType() == Type.TIMESTAMP); + Expr toUnixTimeExpr = new FunctionCallExpr("utc_to_unix_micros", + Lists.newArrayList(timestampExpr)); + toUnixTimeExpr.analyze(analyzer); + TColumnValue result = FeSupport.EvalExprWithoutRow(toUnixTimeExpr, + analyzer.getQueryCtx()); + Preconditions.checkArgument(result.isSetLong_val()); + return result.getLong_val(); + } + public static Encoding fromThrift(TColumnEncoding encoding) throws ImpalaRuntimeException { switch (encoding) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24c77f19/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test index bad3299..2ab2ba9 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test @@ -146,15 +146,44 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 select * from functional_kudu.alltypes where tinyint_col is not null and smallint_col is null and -cast(date_string_col as tinyint) is null +cast(date_string_col as tinyint) is null and +timestamp_col > (nanoseconds_add(cast('1987-05-19 00:00:00' as timestamp), 10)) and +timestamp_col < (seconds_add(cast('9999-12-31 24:59:59' as timestamp), 10)) ---- PLAN F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK | mem-estimate=0B mem-reservation=0B | 00:SCAN KUDU [functional_kudu.alltypes] - predicates: CAST(date_string_col AS TINYINT) IS NULL - kudu predicates: smallint_col IS NULL, tinyint_col IS NOT NULL + predicates: CAST(date_string_col AS TINYINT) IS NULL, timestamp_col < NULL + kudu predicates: smallint_col IS NULL, tinyint_col IS NOT NULL, timestamp_col > TIMESTAMP '1987-05-19 00:00:00.000000010' mem-estimate=0B mem-reservation=0B tuple-ids=0 row-size=126B cardinality=730 ==== +select * from functional_kudu.alltypes where +timestamp_col in (cast('2010-03-01 00:00:00' as timestamp), + cast('2010-03-01 00:01:00' as timestamp)) +---- PLAN +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 + PLAN-ROOT SINK + | mem-estimate=0B mem-reservation=0B + | + 00:SCAN KUDU [functional_kudu.alltypes] + kudu predicates: timestamp_col IN (TIMESTAMP '2010-03-01 00:00:00', TIMESTAMP '2010-03-01 00:01:00') + mem-estimate=0B mem-reservation=0B + tuple-ids=0 row-size=97B cardinality=1 +==== +select * from functional_kudu.alltypes where +timestamp_col in (cast('2010-03-01 00:00:00' as timestamp), + null, + cast('2010-03-01 00:01:00' as timestamp)) +---- PLAN +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 + PLAN-ROOT SINK + | mem-estimate=0B mem-reservation=0B + | + 00:SCAN KUDU [functional_kudu.alltypes] + predicates: timestamp_col IN (TIMESTAMP '2010-03-01 00:00:00', NULL, TIMESTAMP '2010-03-01 00:01:00') + mem-estimate=0B mem-reservation=0B + tuple-ids=0 row-size=97B cardinality=3 +==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24c77f19/testdata/workloads/functional-query/queries/QueryTest/kudu-scan-node.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu-scan-node.test b/testdata/workloads/functional-query/queries/QueryTest/kudu-scan-node.test index 243b7e5..115affa 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/kudu-scan-node.test +++ b/testdata/workloads/functional-query/queries/QueryTest/kudu-scan-node.test @@ -84,4 +84,59 @@ select count(*) from functional_kudu.alltypes where id is null; 0 ---- TYPES BIGINT -==== \ No newline at end of file +==== +---- QUERY +# Push down TIMESTAMP binary predicates +select id, timestamp_col from functional_kudu.alltypes where +timestamp_col <= cast('2009-01-01 00:08:00.28' as timestamp) and +timestamp_col >= cast('2009-01-01 00:04:00.6' as timestamp) +order by id; +---- RESULTS +4,2009-01-01 00:04:00.600000000 +5,2009-01-01 00:05:00.100000000 +6,2009-01-01 00:06:00.150000000 +7,2009-01-01 00:07:00.210000000 +8,2009-01-01 00:08:00.280000000 +---- TYPES +INT, TIMESTAMP +==== +---- QUERY +# Out-of-range TIMESTAMP predicate (evaluates to NULL) +select id, timestamp_col from functional_kudu.alltypes where +timestamp_col > cast('1000-01-01 00:00:00.00' as timestamp) +---- RESULTS +---- TYPES +INT, TIMESTAMP +==== +---- QUERY +select id, timestamp_col from functional_kudu.alltypes where +timestamp_col < cast('2009-01-01 00:08:00.28' as timestamp) and +timestamp_col > cast('2009-01-01 00:04:00.6' as timestamp) +order by id; +---- RESULTS +5,2009-01-01 00:05:00.100000000 +6,2009-01-01 00:06:00.150000000 +7,2009-01-01 00:07:00.210000000 +---- TYPES +INT, TIMESTAMP +==== +---- QUERY +select id, timestamp_col from functional_kudu.alltypes where +timestamp_col = cast('2009-01-01 00:08:00.28' as timestamp); +---- RESULTS +8,2009-01-01 00:08:00.280000000 +---- TYPES +INT, TIMESTAMP +==== +---- QUERY +# Push down TIMESTAMP IN list predicates +select id, timestamp_col from functional_kudu.alltypes where +timestamp_col in (cast('2010-03-01 00:00:00' as timestamp), + cast('2010-03-01 00:01:00' as timestamp)) +order by id; +---- RESULTS +4240,2010-03-01 00:00:00 +4241,2010-03-01 00:01:00 +---- TYPES +INT, TIMESTAMP +====
