Repository: incubator-impala Updated Branches: refs/heads/master 9c4f75a6a -> 2dcbefc65
IMPALA-5338: Fix Kudu timestamp column default values While support for TIMESTAMP columns in Kudu tables has been committed (IMPALA-5137), it does not support TIMESTAMP column default values. This supports CREATE TABLE syntax to specify the default values, but more importantly this fixes the loading of Kudu tables that may have had default values set on UNIXTIME_MICROS columns, e.g. if the table was created via the python client. This involves fixing KuduColumn to hide the LiteralExpr representing the default value because it will be a BIGINT if the column type is TIMESTAMP. It is only needed to call toSql() and toStringValue(), so helper functions are added to KuduColumn to encapsulate special logic for TIMESTAMP. TODO: Add support and tests for ALTER setting the default value (when IMPALA-4622 is committed). Change-Id: I655910fb4805bb204a999627fa9f68e43ea8aaf2 Reviewed-on: http://gerrit.cloudera.org:8080/6936 Reviewed-by: Matthew Jacobs <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/2dcbefc6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/2dcbefc6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/2dcbefc6 Branch: refs/heads/master Commit: 2dcbefc652ac59d62e83f55a40d4833b364d50be Parents: 9c4f75a Author: Matthew Jacobs <[email protected]> Authored: Mon May 22 18:15:08 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Fri Jun 2 01:47:48 2017 +0000 ---------------------------------------------------------------------- be/src/exec/kudu-scanner.cc | 5 +- be/src/exprs/timestamp-functions-ir.cc | 9 +++ be/src/exprs/timestamp-functions.h | 4 ++ be/src/runtime/timestamp-value.cc | 1 + be/src/runtime/timestamp-value.h | 9 +-- be/src/runtime/timestamp-value.inline.h | 10 ++++ common/function-registry/impala_functions.py | 4 +- .../org/apache/impala/analysis/ColumnDef.java | 60 +++++++++++++++---- .../apache/impala/analysis/CreateTableStmt.java | 10 ++-- .../apache/impala/analysis/RangePartition.java | 1 + .../org/apache/impala/analysis/ToSqlUtils.java | 37 +++++++++--- .../org/apache/impala/catalog/KuduColumn.java | 43 +++++++++++-- .../org/apache/impala/planner/KuduScanNode.java | 4 +- .../impala/service/DescribeResultFactory.java | 2 +- .../java/org/apache/impala/util/KuduUtil.java | 18 ++++-- .../apache/impala/analysis/AnalyzeDDLTest.java | 28 ++++++++- .../org/apache/impala/analysis/ToSqlTest.java | 21 +++++++ .../queries/QueryTest/kudu_create.test | 32 ++++++++++ tests/query_test/test_kudu.py | 63 +++++++++++++++++++- 19 files changed, 312 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2dcbefc6/be/src/exec/kudu-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc index 8ed160f..3cae4af 100644 --- a/be/src/exec/kudu-scanner.cc +++ b/be/src/exec/kudu-scanner.cc @@ -31,6 +31,7 @@ #include "runtime/runtime-state.h" #include "runtime/row-batch.h" #include "runtime/string-value.h" +#include "runtime/timestamp-value.inline.h" #include "runtime/tuple-row.h" #include "gutil/gscoped_ptr.h" #include "gutil/strings/substitute.h" @@ -207,9 +208,7 @@ Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch, Tuple** tuple_me } int64_t ts_micros = *reinterpret_cast<int64_t*>( kudu_tuple->GetSlot(slot->tuple_offset())); - int64_t ts_seconds = ts_micros / MICROS_PER_SEC; - int64_t micros_part = ts_micros - (ts_seconds * MICROS_PER_SEC); - TimestampValue tv = TimestampValue::FromUnixTimeMicros(ts_seconds, micros_part); + TimestampValue tv = TimestampValue::FromUnixTimeMicros(ts_micros); if (tv.HasDateAndTime()) { RawValue::Write(&tv, kudu_tuple, slot, NULL); } else { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2dcbefc6/be/src/exprs/timestamp-functions-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/timestamp-functions-ir.cc b/be/src/exprs/timestamp-functions-ir.cc index 7c6f8d5..b1aae03 100644 --- a/be/src/exprs/timestamp-functions-ir.cc +++ b/be/src/exprs/timestamp-functions-ir.cc @@ -128,6 +128,15 @@ BigIntVal TimestampFunctions::UtcToUnixMicros(FunctionContext* context, return (tv.UtcToUnixTimeMicros(&result)) ? BigIntVal(result) : BigIntVal::null(); } +TimestampVal TimestampFunctions::TimestampFromUnixMicros(FunctionContext* context, + const BigIntVal& unix_time_micros) { + if (unix_time_micros.is_null) return TimestampVal::null(); + TimestampValue tv = TimestampValue::FromUnixTimeMicros(unix_time_micros.val); + TimestampVal result; + tv.ToTimestampVal(&result); + return result; +} + TimestampVal TimestampFunctions::ToTimestamp(FunctionContext* context, const BigIntVal& bigint_val) { if (bigint_val.is_null) return TimestampVal::null(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2dcbefc6/be/src/exprs/timestamp-functions.h ---------------------------------------------------------------------- diff --git a/be/src/exprs/timestamp-functions.h b/be/src/exprs/timestamp-functions.h index b4c685b..3c48567 100644 --- a/be/src/exprs/timestamp-functions.h +++ b/be/src/exprs/timestamp-functions.h @@ -102,6 +102,10 @@ class TimestampFunctions { static StringVal FromUnix(FunctionContext* context, const TIME& unix_time, const StringVal& fmt); + /// Return a timestamp from a unix time in microseconds. + static TimestampVal TimestampFromUnixMicros(FunctionContext* context, + const BigIntVal& unix_time_micros); + /// Convert a timestamp to or from a particular timezone based time. static TimestampVal FromUtc(FunctionContext* context, const TimestampVal& ts_val, const StringVal& tz_string_val); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2dcbefc6/be/src/runtime/timestamp-value.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/timestamp-value.cc b/be/src/runtime/timestamp-value.cc index be76db2..32252a0 100644 --- a/be/src/runtime/timestamp-value.cc +++ b/be/src/runtime/timestamp-value.cc @@ -115,6 +115,7 @@ ptime TimestampValue::UnixTimeToPtime(time_t unix_time) { /// Unix times are represented internally in boost as 32 bit ints which limits the /// range of dates to 1901-2038 (https://svn.boost.org/trac/boost/ticket/3109), so /// libc functions will be used instead. + // TODO: Conversion using libc is very expensive (IMPALA-5357); find an alternative. tm temp_tm; if (FLAGS_use_local_tz_for_unix_timestamp_conversions) { if (UNLIKELY(localtime_r(&unix_time, &temp_tm) == nullptr)) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2dcbefc6/be/src/runtime/timestamp-value.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/timestamp-value.h b/be/src/runtime/timestamp-value.h index f0ba1f6..83c38e9 100644 --- a/be/src/runtime/timestamp-value.h +++ b/be/src/runtime/timestamp-value.h @@ -102,13 +102,8 @@ class TimestampValue { return TimestampValue(temp); } - /// Same as FromUnixTime() above, but adds the specified number of microseconds to the - /// resulting TimestampValue. - static TimestampValue FromUnixTimeMicros(time_t unix_time, int64_t micros) { - boost::posix_time::ptime temp = UnixTimeToPtime(unix_time); - temp += boost::posix_time::microseconds(micros); - return TimestampValue(temp); - } + /// Same as FromUnixTime() above, but the unix time is specified in microseconds. + static TimestampValue FromUnixTimeMicros(int64_t unix_time_micros); /// Returns a TimestampValue where the integer part of the specified 'unix_time' /// specifies the number of seconds (see above), and the fractional part is converted http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2dcbefc6/be/src/runtime/timestamp-value.inline.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/timestamp-value.inline.h b/be/src/runtime/timestamp-value.inline.h index 7cf4acd..1d14e30 100644 --- a/be/src/runtime/timestamp-value.inline.h +++ b/be/src/runtime/timestamp-value.inline.h @@ -28,6 +28,14 @@ namespace impala { +inline TimestampValue TimestampValue::FromUnixTimeMicros(int64_t unix_time_micros) { + int64_t ts_seconds = unix_time_micros / MICROS_PER_SEC; + int64_t micros_part = unix_time_micros - (ts_seconds * MICROS_PER_SEC); + boost::posix_time::ptime temp = UnixTimeToPtime(ts_seconds); + temp += boost::posix_time::microseconds(micros_part); + return TimestampValue(temp); +} + /// Interpret 'this' as a timestamp in UTC and convert to unix time. /// Returns false if the conversion failed ('unix_time' will be undefined), otherwise /// true. @@ -36,6 +44,7 @@ inline bool TimestampValue::UtcToUnixTime(time_t* unix_time) const { if (UNLIKELY(!HasDateAndTime())) return false; const boost::posix_time::ptime temp(date_, time_); tm temp_tm = boost::posix_time::to_tm(temp); + // TODO: Conversion using libc is very expensive (IMPALA-5357); find an alternative. *unix_time = timegm(&temp_tm); return true; } @@ -75,6 +84,7 @@ inline bool TimestampValue::ToUnixTime(time_t* unix_time) const { if (UNLIKELY(!HasDateAndTime())) return false; const boost::posix_time::ptime temp(date_, time_); tm temp_tm = boost::posix_time::to_tm(temp); + // TODO: Conversion using libc is very expensive (IMPALA-5357); find an alternative. if (FLAGS_use_local_tz_for_unix_timestamp_conversions) { *unix_time = mktime(&temp_tm); } else { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2dcbefc6/common/function-registry/impala_functions.py ---------------------------------------------------------------------- diff --git a/common/function-registry/impala_functions.py b/common/function-registry/impala_functions.py index 296d705..8f8f07e 100644 --- a/common/function-registry/impala_functions.py +++ b/common/function-registry/impala_functions.py @@ -204,11 +204,13 @@ visible_functions = [ [['nanoseconds_sub'], 'TIMESTAMP', ['TIMESTAMP', 'BIGINT'], '_ZN6impala18TimestampFunctions6AddSubILb0EN10impala_udf9BigIntValEN5boost9date_time18subsecond_durationINS4_10posix_time13time_durationELl1000000000EEELb0EEENS2_12TimestampValEPNS2_15FunctionContextERKSA_RKT0_'], [['datediff'], 'INT', ['TIMESTAMP', 'TIMESTAMP'], '_ZN6impala18TimestampFunctions8DateDiffEPN10impala_udf15FunctionContextERKNS1_12TimestampValES6_'], - [['unix_timestamp'], 'BIGINT', [], '_ZN6impala18TimestampFunctions4UnixEPN10impala_udf15FunctionContextE'], + [['unix_timestamp'], 'BIGINT', [], '_ZN6impala18TimestampFunctions4UnixEPN10impala_udf15FunctionContextE'], [['unix_timestamp'], 'BIGINT', ['TIMESTAMP'], '_ZN6impala18TimestampFunctions4UnixEPN10impala_udf15FunctionContextERKNS1_12TimestampValE'], [['unix_timestamp'], 'BIGINT', ['STRING', 'STRING'], '_ZN6impala18TimestampFunctions4UnixEPN10impala_udf15FunctionContextERKNS1_9StringValES6_', '_ZN6impala18TimestampFunctions22UnixAndFromUnixPrepareEPN10impala_udf15FunctionContextENS2_18FunctionStateScopeE', '_ZN6impala18TimestampFunctions20UnixAndFromUnixCloseEPN10impala_udf15FunctionContextENS2_18FunctionStateScopeE'], + [['timestamp_from_unix_micros'], 'TIMESTAMP', ['BIGINT'], + '_ZN6impala18TimestampFunctions23TimestampFromUnixMicrosEPN10impala_udf15FunctionContextERKNS1_9BigIntValE'], [['utc_to_unix_micros'], 'BIGINT', ['TIMESTAMP'], '_ZN6impala18TimestampFunctions15UtcToUnixMicrosEPN10impala_udf15FunctionContextERKNS1_12TimestampValE'], [['from_unixtime'], 'STRING', ['INT'], http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2dcbefc6/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java b/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java index 57e4b51..844d70f 100644 --- a/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java +++ b/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java @@ -17,6 +17,7 @@ package org.apache.impala.analysis; +import java.math.BigInteger; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -29,7 +30,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.commons.lang3.builder.EqualsBuilder; -import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.impala.catalog.Type; import org.apache.impala.common.AnalysisException; @@ -81,8 +81,17 @@ public class ColumnDef { private String compressionVal_; // Compression algorithm for this column; set in analysis. private CompressionAlgorithm compression_; - // Default value for this column. + + // Default value specified for this column. private Expr defaultValue_; + + // Default value for this column involving any conversions necessary, set during + // analysis. For TIMESTAMP columns, defaultValue_ is a TimestampLiteral and this is an + // IntegerLiteral containing the Unix time in microseconds. For all other column types, + // this is equal to defaultValue_. + // TODO: Remove when Impala supports a 64-bit TIMESTAMP type. + private Expr outputDefaultValue_; + // Desired block size for this column. private LiteralExpr blockSize_; @@ -239,27 +248,56 @@ public class ColumnDef { throw new AnalysisException(String.format("Only constant values are allowed " + "for default values: %s", defaultValue_.toSql())); } - defaultValue_ = LiteralExpr.create(defaultValue_, analyzer.getQueryCtx()); - if (defaultValue_ == null) { + LiteralExpr defaultValLiteral = LiteralExpr.create(defaultValue_, + analyzer.getQueryCtx()); + if (defaultValLiteral == null) { throw new AnalysisException(String.format("Only constant values are allowed " + "for default values: %s", defaultValue_.toSql())); } - if (defaultValue_.getType().isNull() && ((isNullable_ != null && !isNullable_) + if (defaultValLiteral.getType().isNull() && ((isNullable_ != null && !isNullable_) || isPrimaryKey_)) { throw new AnalysisException(String.format("Default value of NULL not allowed " + "on non-nullable column: '%s'", getColName())); } - if (!Type.isImplicitlyCastable(defaultValue_.getType(), type_, true)) { + + // Special case string literals in timestamp columns for convenience. + if (defaultValLiteral.getType().isStringType() && type_.isTimestamp()) { + // Add an explicit cast to TIMESTAMP + Expr e = new CastExpr(new TypeDef(Type.TIMESTAMP), defaultValLiteral); + e.analyze(analyzer); + defaultValLiteral = LiteralExpr.create(e, analyzer.getQueryCtx()); + Preconditions.checkNotNull(defaultValLiteral); + if (defaultValLiteral.isNullLiteral()) { + throw new AnalysisException(String.format("String %s cannot be cast " + + "to a TIMESTAMP literal.", defaultValue_.toSql())); + } + } + + if (!Type.isImplicitlyCastable(defaultValLiteral.getType(), type_, true)) { throw new AnalysisException(String.format("Default value %s (type: %s) " + "is not compatible with column '%s' (type: %s).", defaultValue_.toSql(), defaultValue_.getType().toSql(), colName_, type_.toSql())); } - if (!defaultValue_.getType().equals(type_)) { - Expr castLiteral = defaultValue_.uncheckedCastTo(type_); + if (!defaultValLiteral.getType().equals(type_)) { + Expr castLiteral = defaultValLiteral.uncheckedCastTo(type_); Preconditions.checkNotNull(castLiteral); - defaultValue_ = LiteralExpr.create(castLiteral, analyzer.getQueryCtx()); + defaultValLiteral = LiteralExpr.create(castLiteral, analyzer.getQueryCtx()); + } + Preconditions.checkNotNull(defaultValLiteral); + outputDefaultValue_ = defaultValLiteral; + + // TODO: Remove when Impala supports a 64-bit TIMESTAMP type. + if (type_.isTimestamp()) { + try { + long unixTimeMicros = KuduUtil.timestampToUnixTimeMicros(analyzer, + defaultValLiteral); + outputDefaultValue_ = new NumericLiteral(BigInteger.valueOf(unixTimeMicros), + Type.BIGINT); + } catch (Exception e) { + throw new AnalysisException(String.format( + "%s cannot be cast to a TIMESTAMP literal.", defaultValue_.toSql()), e); + } } - Preconditions.checkNotNull(defaultValue_); } // Analyze the block size value, if any. @@ -315,7 +353,7 @@ public class ColumnDef { Integer blockSize = blockSize_ == null ? null : (int) ((NumericLiteral) blockSize_).getIntValue(); KuduUtil.setColumnOptions(col, isPrimaryKey_, isNullable_, encoding_, - compression_, defaultValue_, blockSize); + compression_, outputDefaultValue_, blockSize); if (comment_ != null) col.setComment(comment_); return col; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2dcbefc6/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java index cba821e..6010eb6 100644 --- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java @@ -48,8 +48,7 @@ public class CreateTableStmt extends StatementBase { @VisibleForTesting final static String KUDU_STORAGE_HANDLER_ERROR_MESSAGE = "Kudu tables must be" - + " specified using 'STORED AS KUDU' without using the storage handler table" - + " property."; + + " specified using 'STORED AS KUDU'."; // Table parameters specified in a CREATE TABLE statement private final TableDef tableDef_; @@ -211,8 +210,11 @@ public class CreateTableStmt extends StatementBase { * Kudu tables. */ private void analyzeKuduTableProperties(Analyzer analyzer) throws AnalysisException { - if (getTblProperties().containsKey(KuduTable.KEY_STORAGE_HANDLER)) { - throw new AnalysisException(KUDU_STORAGE_HANDLER_ERROR_MESSAGE); + // Only the Kudu storage handler may be specified for Kudu tables. + String handler = getTblProperties().get(KuduTable.KEY_STORAGE_HANDLER); + if (handler != null && !handler.equals(KuduTable.KUDU_STORAGE_HANDLER)) { + throw new AnalysisException("Invalid storage handler specified for Kudu table: " + + handler); } getTblProperties().put(KuduTable.KEY_STORAGE_HANDLER, KuduTable.KUDU_STORAGE_HANDLER); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2dcbefc6/fe/src/main/java/org/apache/impala/analysis/RangePartition.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/RangePartition.java b/fe/src/main/java/org/apache/impala/analysis/RangePartition.java index 5ff00c7..b45a785 100644 --- a/fe/src/main/java/org/apache/impala/analysis/RangePartition.java +++ b/fe/src/main/java/org/apache/impala/analysis/RangePartition.java @@ -198,6 +198,7 @@ public class RangePartition implements ParseNode { } Preconditions.checkNotNull(literal); + // TODO: Remove when Impala supports a 64-bit TIMESTAMP type. if (colType.isTimestamp()) { try { long unixTimeMicros = KuduUtil.timestampToUnixTimeMicros(analyzer, literal); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2dcbefc6/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java index 72bde98..68f9ec3 100644 --- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java +++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java @@ -79,6 +79,21 @@ public class ToSqlUtils { } /** + * Returns a comma-delimited string of Kudu 'partition by' parameters on a + * CreateTableStmt, or null if this isn't a CreateTableStmt for a Kudu table. + */ + private static String getKuduPartitionByParams(CreateTableStmt stmt) { + List<KuduPartitionParam> partitionParams = stmt.getKuduPartitionParams(); + Preconditions.checkNotNull(partitionParams); + if (partitionParams.isEmpty()) return null; + List<String> paramStrings = Lists.newArrayList(); + for (KuduPartitionParam p : partitionParams) { + paramStrings.add(p.toSql()); + } + return Joiner.on(", ").join(paramStrings); + } + + /** * Given an unquoted identifier string, returns an identifier lexable by * Impala and Hive, possibly by enclosing the original identifier in "`" quotes. * For example, Hive cannot parse its own auto-generated column @@ -146,11 +161,12 @@ public class ToSqlUtils { for (ColumnDef col: stmt.getPartitionColumnDefs()) { partitionColsSql.add(col.toString()); } + String kuduParamsSql = getKuduPartitionByParams(stmt); // TODO: Pass the correct compression, if applicable. return getCreateTableSql(stmt.getDb(), stmt.getTbl(), stmt.getComment(), colsSql, - partitionColsSql, stmt.getTblPrimaryKeyColumnNames(), null, stmt.getSortColumns(), - stmt.getTblProperties(), stmt.getSerdeProperties(), stmt.isExternal(), - stmt.getIfNotExists(), stmt.getRowFormat(), + partitionColsSql, stmt.getTblPrimaryKeyColumnNames(), kuduParamsSql, + stmt.getSortColumns(), stmt.getTblProperties(), stmt.getSerdeProperties(), + stmt.isExternal(), stmt.getIfNotExists(), stmt.getRowFormat(), HdfsFileFormat.fromThrift(stmt.getFileFormat()), HdfsCompression.NONE, null, stmt.getLocation()); } @@ -169,11 +185,12 @@ public class ToSqlUtils { } HashMap<String, String> properties = Maps.newHashMap(innerStmt.getTblProperties()); removeHiddenTableProperties(properties); + String kuduParamsSql = getKuduPartitionByParams(innerStmt); // TODO: Pass the correct compression, if applicable. String createTableSql = getCreateTableSql(innerStmt.getDb(), innerStmt.getTbl(), innerStmt.getComment(), null, partitionColsSql, - innerStmt.getTblPrimaryKeyColumnNames(), null, innerStmt.getSortColumns(), - properties, innerStmt.getSerdeProperties(), + innerStmt.getTblPrimaryKeyColumnNames(), kuduParamsSql, + innerStmt.getSortColumns(), properties, innerStmt.getSerdeProperties(), innerStmt.isExternal(), innerStmt.getIfNotExists(), innerStmt.getRowFormat(), HdfsFileFormat.fromThrift(innerStmt.getFileFormat()), HdfsCompression.NONE, null, innerStmt.getLocation()); @@ -282,6 +299,12 @@ public class ToSqlUtils { Joiner.on(", ").appendTo(sb, primaryKeysSql).append(")"); } sb.append("\n)"); + } else { + // CTAS for Kudu tables still print the primary key + if (primaryKeysSql != null && !primaryKeysSql.isEmpty()) { + sb.append("\n PRIMARY KEY ("); + Joiner.on(", ").appendTo(sb, primaryKeysSql).append(")"); + } } sb.append("\n"); @@ -384,8 +407,8 @@ public class ToSqlUtils { if (kuduCol.getCompression() != null) { sb.append(" COMPRESSION " + kuduCol.getCompression()); } - if (kuduCol.getDefaultValue() != null) { - sb.append(" DEFAULT " + kuduCol.getDefaultValue().toSql()); + if (kuduCol.hasDefaultValue()) { + sb.append(" DEFAULT " + kuduCol.getDefaultValueSql()); } if (kuduCol.getBlockSize() != 0) { sb.append(String.format(" BLOCK_SIZE %d", kuduCol.getBlockSize())); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2dcbefc6/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java b/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java index 5640748..dda63c1 100644 --- a/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java +++ b/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java @@ -44,13 +44,21 @@ public class KuduColumn extends Column { private final boolean isNullable_; private final Encoding encoding_; private final CompressionAlgorithm compression_; - private final LiteralExpr defaultValue_; private final int blockSize_; + // Default value for this column. The expr is a literal of the target column type + // post-analysis. For TIMESTAMPs those are BIGINT values storing the unix time in + // microseconds. Code that references this may need to handle TIMESTAMP specially. + // For that reason, this isn't exposed publicly, e.g. getDefaultValueSql() is used + // to hide this complexity externally. + private final LiteralExpr defaultValue_; + private KuduColumn(String name, Type type, boolean isKey, boolean isNullable, Encoding encoding, CompressionAlgorithm compression, LiteralExpr defaultValue, int blockSize, String comment, int position) { super(name, type, comment, position); + Preconditions.checkArgument(defaultValue == null || type == defaultValue.getType() + || (type.isTimestamp() && defaultValue.getType().isIntegerType())); isKey_ = isKey; isNullable_ = isNullable; encoding_ = encoding; @@ -65,8 +73,9 @@ public class KuduColumn extends Column { Object defaultValue = colSchema.getDefaultValue(); LiteralExpr defaultValueExpr = null; if (defaultValue != null) { + Type defaultValueType = type.isTimestamp() ? Type.BIGINT : type; try { - defaultValueExpr = LiteralExpr.create(defaultValue.toString(), type); + defaultValueExpr = LiteralExpr.create(defaultValue.toString(), defaultValueType); } catch (AnalysisException e) { throw new ImpalaRuntimeException(String.format("Error parsing default value: " + "'%s'", defaultValue), e); @@ -92,8 +101,10 @@ public class KuduColumn extends Column { } LiteralExpr defaultValue = null; if (column.isSetDefault_value()) { + Type defaultValueType = columnType.isTimestamp() ? Type.BIGINT : columnType; defaultValue = - LiteralExpr.fromThrift(column.getDefault_value().getNodes().get(0), columnType); + LiteralExpr.fromThrift(column.getDefault_value().getNodes().get(0), + defaultValueType); } int blockSize = 0; if (column.isSetBlock_size()) blockSize = column.getBlock_size(); @@ -106,9 +117,31 @@ public class KuduColumn extends Column { public boolean isNullable() { return isNullable_; } public Encoding getEncoding() { return encoding_; } public CompressionAlgorithm getCompression() { return compression_; } - public LiteralExpr getDefaultValue() { return defaultValue_; } - public boolean hasDefaultValue() { return defaultValue_ != null; } public int getBlockSize() { return blockSize_; } + public boolean hasDefaultValue() { return defaultValue_ != null; } + + /** + * Returns a SQL string representation of the default value. Similar to calling + * LiteralExpr.toSql(), but this handles TIMESTAMPs specially because + * TIMESTAMP default values are stored as BIGINTs representing unix time in + * microseconds. For TIMESTAMP columns, the returned string is the function to + * convert unix times in microseconds to TIMESTAMPs with the value as its parameter. + */ + public String getDefaultValueSql() { + if (!hasDefaultValue()) return null; + if (!type_.isTimestamp()) return defaultValue_.toSql(); + return "timestamp_from_unix_micros(" + defaultValue_.getStringValue() + ")"; + } + + /** + * Returns a string representation of the default value. This calls getStringValue() + * but is exposed so defaultValue_ can be encapsulated as it has special handling for + * TIMESTAMP column types. + */ + public String getDefaultValueString() { + if (!hasDefaultValue()) return null; + return defaultValue_.getStringValue(); + } @Override public TColumn toThrift() { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2dcbefc6/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java index 4156a72..c03307e 100644 --- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java @@ -37,9 +37,7 @@ import org.apache.impala.analysis.StringLiteral; import org.apache.impala.analysis.TupleDescriptor; import org.apache.impala.catalog.KuduTable; import org.apache.impala.catalog.Type; -import org.apache.impala.common.AnalysisException; import org.apache.impala.common.ImpalaRuntimeException; -import org.apache.impala.common.InternalException; import org.apache.impala.thrift.TExplainLevel; import org.apache.impala.thrift.TKuduScanNode; import org.apache.impala.thrift.TNetworkAddress; @@ -392,6 +390,7 @@ public class KuduScanNode extends ScanNode { } case TIMESTAMP: { try { + // TODO: Simplify when Impala supports a 64-bit TIMESTAMP type. kuduPredicate = KuduPredicate.newComparisonPredicate(column, op, KuduUtil.timestampToUnixTimeMicros(analyzer, literal)); } catch (Exception e) { @@ -492,6 +491,7 @@ public class KuduScanNode extends ScanNode { case STRING: return ((StringLiteral) e).getValue(); case TIMESTAMP: { try { + // TODO: Simplify when Impala supports a 64-bit TIMESTAMP type. return KuduUtil.timestampToUnixTimeMicros(analyzer, e); } catch (Exception ex) { LOG.info("Exception converting Kudu timestamp expr: " + e.toSql(), ex); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2dcbefc6/fe/src/main/java/org/apache/impala/service/DescribeResultFactory.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/DescribeResultFactory.java b/fe/src/main/java/org/apache/impala/service/DescribeResultFactory.java index 05e05f9..5e90b06 100644 --- a/fe/src/main/java/org/apache/impala/service/DescribeResultFactory.java +++ b/fe/src/main/java/org/apache/impala/service/DescribeResultFactory.java @@ -274,7 +274,7 @@ public class DescribeResultFactory { nullableCol.setString_val(Boolean.toString(kuduColumn.isNullable())); TColumnValue defaultValCol = new TColumnValue(); if (kuduColumn.hasDefaultValue()) { - defaultValCol.setString_val(kuduColumn.getDefaultValue().getStringValue()); + defaultValCol.setString_val(kuduColumn.getDefaultValueString()); } else { defaultValCol.setString_val(""); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2dcbefc6/fe/src/main/java/org/apache/impala/util/KuduUtil.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/util/KuduUtil.java b/fe/src/main/java/org/apache/impala/util/KuduUtil.java index 645866b..c615d06 100644 --- a/fe/src/main/java/org/apache/impala/util/KuduUtil.java +++ b/fe/src/main/java/org/apache/impala/util/KuduUtil.java @@ -19,7 +19,6 @@ package org.apache.impala.util; import static java.lang.String.format; -import java.util.HashSet; import java.util.List; import org.apache.impala.analysis.Analyzer; @@ -29,7 +28,6 @@ import org.apache.impala.analysis.FunctionCallExpr; import org.apache.impala.analysis.InsertStmt; import org.apache.impala.analysis.KuduPartitionExpr; import org.apache.impala.analysis.LiteralExpr; -import org.apache.impala.analysis.NumericLiteral; import org.apache.impala.catalog.KuduTable; import org.apache.impala.catalog.ScalarType; import org.apache.impala.catalog.Type; @@ -56,9 +54,7 @@ import org.apache.kudu.client.PartialRow; import org.apache.kudu.client.RangePartitionBound; import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; public class KuduUtil { @@ -168,6 +164,12 @@ public class KuduUtil { } } + /** + * Returns the actual value of the specified defaultValue literal. The returned type is + * the value type stored by Kudu for the column. E.g. if 'type' is 'INT8', the returned + * value is a Java byte, and if 'type' is 'UNIXTIME_MICROS', the returned value is + * a Java long. + */ public static Object getKuduDefaultValue(TExpr defaultValue, org.apache.kudu.Type type, String colName) throws ImpalaRuntimeException { Preconditions.checkState(defaultValue.getNodes().size() == 1); @@ -198,6 +200,9 @@ public class KuduUtil { case BOOL: checkCorrectType(literal.isSetBool_literal(), type, colName, literal); return literal.getBool_literal().isValue(); + case UNIXTIME_MICROS: + checkCorrectType(literal.isSetInt_literal(), type, colName, literal); + return literal.getInt_literal().getValue(); default: throw new ImpalaRuntimeException("Unsupported value for column type: " + type.toString()); @@ -214,7 +219,10 @@ public class KuduUtil { toUnixTimeExpr.analyze(analyzer); TColumnValue result = FeSupport.EvalExprWithoutRow(toUnixTimeExpr, analyzer.getQueryCtx()); - Preconditions.checkArgument(result.isSetLong_val()); + if (!result.isSetLong_val()) { + throw new InternalException("Error converting timestamp expression: " + + timestampExpr.debugString()); + } return result.getLong_val(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2dcbefc6/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java index 1a8cba5..e5ba4bc 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java @@ -2165,7 +2165,7 @@ public class AnalyzeDDLTest extends FrontendTestBase { CreateTableStmt.KUDU_STORAGE_HANDLER_ERROR_MESSAGE); AnalysisError("create table tab (x int primary key) stored as kudu tblproperties (" + "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler')", - CreateTableStmt.KUDU_STORAGE_HANDLER_ERROR_MESSAGE); + "Table partitioning must be specified for managed Kudu tables."); // Invalid value for number of replicas AnalysisError("create table t (x int primary key) stored as kudu tblproperties (" + "'kudu.num_tablet_replicas'='1.1')", @@ -2340,6 +2340,30 @@ public class AnalyzeDDLTest extends FrontendTestBase { "partition by range (partition 100 <= VALUES < 200) stored as kudu", "Range partition value 100 (type: TINYINT) is not type " + "compatible with partitioning column 'ts' (type: TIMESTAMP)."); + + // TIMESTAMP columns with default values + AnalyzesOk("create table tdefault (id int primary key, ts timestamp default now())" + + "partition by hash(id) partitions 3 stored as kudu"); + AnalyzesOk("create table tdefault (id int primary key, ts timestamp default " + + "timestamp_from_unix_micros(1230768000000000)) partition by hash(id) " + + "partitions 3 stored as kudu"); + AnalyzesOk("create table tdefault (id int primary key, " + + "ts timestamp not null default '2009-01-01 00:00:00') " + + "partition by hash(id) partitions 3 stored as kudu"); + AnalyzesOk("create table tdefault (id int primary key, " + + "ts timestamp not null default cast('2009-01-01 00:00:00' as timestamp)) " + + "partition by hash(id) partitions 3 stored as kudu"); + AnalysisError("create table tdefault (id int primary key, ts timestamp " + + "default null) partition by hash(id) partitions 3 stored as kudu", + "NULL cannot be cast to a TIMESTAMP literal."); + AnalysisError("create table tdefault (id int primary key, " + + "ts timestamp not null default cast('00:00:00' as timestamp)) " + + "partition by hash(id) partitions 3 stored as kudu", + "CAST('00:00:00' AS TIMESTAMP) cannot be cast to a TIMESTAMP literal."); + AnalysisError("create table tdefault (id int primary key, " + + "ts timestamp not null default '2009-1 foo') " + + "partition by hash(id) partitions 3 stored as kudu", + "String '2009-1 foo' cannot be cast to a TIMESTAMP literal."); } @Test @@ -2369,7 +2393,7 @@ public class AnalyzeDDLTest extends FrontendTestBase { "'kudu.table_name'='t')", CreateTableStmt.KUDU_STORAGE_HANDLER_ERROR_MESSAGE); AnalysisError("create external table t stored as kudu tblproperties (" + "'storage_handler'='foo', 'kudu.table_name'='t')", - CreateTableStmt.KUDU_STORAGE_HANDLER_ERROR_MESSAGE); + "Invalid storage handler specified for Kudu table: foo"); // Cannot specify the number of replicas for external Kudu tables AnalysisError("create external table tab (x int) stored as kudu " + "tblproperties ('kudu.num_tablet_replicas' = '1', " + http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2dcbefc6/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java index 3094df4..6660aba 100644 --- a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java @@ -306,6 +306,16 @@ public class ToSqlTest extends FrontendTestBase { "default", "CREATE TABLE default.p ( a INT, b INT ) PARTITIONED BY ( day STRING ) " + "SORT BY ( a, b ) STORED AS TEXTFILE" , true); + // Kudu table with a TIMESTAMP column default value + testToSql("create table p (a bigint primary key, b timestamp default '1987-05-19') " + + "partition by hash(a) partitions 3 stored as kudu " + + "tblproperties ('kudu.master_addresses'='foo')", + "default", + "CREATE TABLE default.p ( a BIGINT PRIMARY KEY, b TIMESTAMP " + + "DEFAULT '1987-05-19' ) PARTITION BY HASH (a) PARTITIONS 3 " + + "STORED AS KUDU TBLPROPERTIES ('kudu.master_addresses'='foo', " + + "'kudu.table_name'='impala::default.p', " + + "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler')", true); } @Test @@ -328,6 +338,17 @@ public class ToSqlTest extends FrontendTestBase { "CREATE TABLE default.p PARTITIONED BY ( int_col ) SORT BY ( string_col ) " + "STORED AS TEXTFILE AS SELECT double_col, string_col, int_col FROM " + "functional.alltypes", true); + // Kudu table with multiple partition params + testToSql("create table p primary key (a,b) partition by hash(a) partitions 3, " + + "range (b) (partition value = 1) stored as kudu " + + "tblproperties ('kudu.master_addresses'='foo') as select int_col a, bigint_col " + + "b from functional.alltypes", + "default", + "CREATE TABLE default.p PRIMARY KEY (a, b) PARTITION BY HASH (a) PARTITIONS 3, " + + "RANGE (b) (PARTITION VALUE = 1) STORED AS KUDU TBLPROPERTIES " + + "('kudu.master_addresses'='foo', 'kudu.table_name'='impala::default.p', " + + "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler') AS " + + "SELECT int_col a, bigint_col b FROM functional.alltypes", true); } @Test http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2dcbefc6/testdata/workloads/functional-query/queries/QueryTest/kudu_create.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_create.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_create.test index 704c868..4aaed16 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/kudu_create.test +++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_create.test @@ -216,3 +216,35 @@ select * from ts_ranges_ctas order by id ---- TYPES TIMESTAMP,INT ==== +---- QUERY +# Creates a Kudu table with timestamp column default values. +create table ts_default (i int primary key, ts1 timestamp, + ts2 timestamp default cast('2009-01-01 00:00:00' as timestamp)) +partition by hash(i) partitions 3 stored as kudu +---- RESULTS +==== +---- QUERY +insert into ts_default (i) values (1); +---- RUNTIME_PROFILE +NumModifiedRows: 1 +NumRowErrors: 0 +---- LABELS +I, TS1, TS2 +---- DML_RESULTS: ts_default +1,NULL,2009-01-01 00:00:00 +---- TYPES +INT,TIMESTAMP,TIMESTAMP +==== +---- QUERY +insert into ts_default (i, ts1, ts2) values (2, NULL, NULL); +---- RUNTIME_PROFILE +NumModifiedRows: 1 +NumRowErrors: 0 +---- LABELS +I, TS1, TS2 +---- DML_RESULTS: ts_default +1,NULL,2009-01-01 00:00:00 +2,NULL,NULL +---- TYPES +INT,TIMESTAMP,TIMESTAMP +==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2dcbefc6/tests/query_test/test_kudu.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py index 7bfdf4b..8f98e3e 100644 --- a/tests/query_test/test_kudu.py +++ b/tests/query_test/test_kudu.py @@ -360,6 +360,38 @@ class TestKuduOperations(KuduTestSuite): class TestCreateExternalTable(KuduTestSuite): + def test_external_timestamp_default_value(self, cursor, kudu_client, unique_database): + """Checks that a Kudu table created outside Impala with a default value on a + UNIXTIME_MICROS column can be loaded by Impala, and validates the DESCRIBE + output is correct.""" + schema_builder = SchemaBuilder() + column_spec = schema_builder.add_column("id", INT64) + column_spec.nullable(False) + column_spec = schema_builder.add_column("ts", UNIXTIME_MICROS) + column_spec.default(datetime(2009, 1, 1, 0, 0, tzinfo=utc)) + schema_builder.set_primary_keys(["id"]) + schema = schema_builder.build() + name = unique_database + ".tsdefault" + + try: + kudu_client.create_table(name, schema, + partitioning=Partitioning().set_range_partition_columns(["id"])) + kudu_table = kudu_client.table(name) + impala_table_name = self.get_kudu_table_base_name(kudu_table.name) + props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name + cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (impala_table_name, + props)) + with self.drop_impala_table_after_context(cursor, impala_table_name): + cursor.execute("DESCRIBE %s" % impala_table_name) + table_desc = [[col.strip() if col else col for col in row] for row in cursor] + # Pytest shows truncated output on failure, so print the details just in case. + LOG.info(table_desc) + assert ["ts", "timestamp", "", "false", "true", "1230768000000000", \ + "AUTO_ENCODING", "DEFAULT_COMPRESSION", "0"] in table_desc + finally: + if kudu_client.table_exists(name): + kudu_client.delete_table(name) + def test_implicit_table_props(self, cursor, kudu_client): """Check that table properties added internally during table creation are as expected. @@ -615,6 +647,36 @@ class TestShowCreateTable(KuduTestSuite): TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format( db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS)) + def test_timestamp_default_value(self, cursor): + create_sql_fmt = """ + CREATE TABLE {table} (c INT, d TIMESTAMP, + e TIMESTAMP NULL DEFAULT CAST('%s' AS TIMESTAMP), + PRIMARY KEY(c, d)) + PARTITION BY HASH(c) PARTITIONS 3 + STORED AS KUDU""" + # Long lines are unfortunate, but extra newlines will break the test. + show_create_sql_fmt = """ + CREATE TABLE {db}.{{table}} ( + c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION, + d TIMESTAMP NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION, + e TIMESTAMP NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION DEFAULT timestamp_from_unix_micros(%s), + PRIMARY KEY (c, d) + ) + PARTITION BY HASH (c) PARTITIONS 3 + STORED AS KUDU + TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format( + db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS) + + self.assert_show_create_equals(cursor, + create_sql_fmt % ("2009-01-01 00:00:00.000001000"), + show_create_sql_fmt % ("1230768000000001")) + self.assert_show_create_equals(cursor, + create_sql_fmt % ("2009-01-01 00:00:00.000001001"), + show_create_sql_fmt % ("1230768000000001")) + self.assert_show_create_equals(cursor, + create_sql_fmt % ("2009-01-01 00:00:00.000000999"), + show_create_sql_fmt % ("1230768000000001")) + def test_properties(self, cursor): # If an explicit table name is used for the Kudu table and it differs from what # would be the default Kudu table name, the name should be shown as a table property. @@ -772,7 +834,6 @@ class TestImpalaKuduIntegration(KuduTestSuite): cursor.execute("SHOW TABLES") assert (impala_table_name,) not in cursor.fetchall() - def test_delete_managed_kudu_table(self, cursor, kudu_client, unique_database): """Check that dropping a managed Kudu table works even if the underlying Kudu table has been dropped externally."""
