This is an automated email from the ASF dual-hosted git repository.
michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 1b6395b8d IMPALA-13627: Handle legacy Hive timezone conversion
1b6395b8d is described below
commit 1b6395b8db09d271bd166bf501bdf7038d8be644
Author: Michael Smith <[email protected]>
AuthorDate: Thu Dec 19 14:14:03 2024 -0800
IMPALA-13627: Handle legacy Hive timezone conversion
After HIVE-12191, Hive has 2 different methods of calculating timestamp
conversion from UTC to local timezone. When Impala has
convert_legacy_hive_parquet_utc_timestamps=true, it assumes times
written by Hive are in UTC and converts them to local time using tzdata,
which matches the newer method introduced by HIVE-12191.
Some dates convert differently between the two methods, such as
Asia/Kuala_Lumpur or Singapore prior to 1982 (also seen in HIVE-24074).
After HIVE-25104, Hive writes 'writer.zone.conversion.legacy' to
distinguish which method is being used. As a result there are three
different cases we have to handle:
1. Hive prior to 3.1 used what’s now called “legacy conversion” using
SimpleDateFormat.
2. Hive 3.1.2 (with HIVE-21290) used a new Java API that’s based on
tzdata and added metadata to identify the timezone.
3. Hive 4 support both, and added a new file metadata to identify it.
Adds handling for Hive files (identified by created_by=parquet-mr) where
we can infer the correct handling from Parquet file metadata:
1. if writer.zone.conversion.legacy is present (Hive 4), use it to
determine whether to use a legacy conversion method compatible with
Hive's legacy behavior, or convert using tzdata.
2. if writer.zone.conversion.legacy is not present but writer.time.zone
is, we can infer it was written by Hive 3.1.2+ using new APIs.
3. otherwise it was likely written by an earlier Hive version.
Adds a new CLI and query option - use_legacy_hive_timestamp_conversion -
to select what conversion method to use in the 3rd case above, when
Impala determines that the file was written by Hive older than 3.1.2.
Defaults to false to minimize changes in Impala's behavior and because
going through JNI is ~50x slower even when the results would not differ;
Hive defaults to true for its equivalent setting:
hive.parquet.timestamp.legacy.conversion.enabled.
Hive legacy-compatible conversion uses a Java method that would be
complicated to mimic in C++, doing
DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
formatter.setTimeZone(TimeZone.getTimeZone(timezone_string));
java.util.Date date = formatter.parse(date_time_string);
formatter.setTimeZone(TimeZone.getTimeZone("UTC"));
return out.println(formatter.format(date);
IMPALA-9385 added a check against a Timezone pointer in
FromUnixTimestamp. That dominates the time in FromUnixTimeNanos,
overriding any benchmark gains from IMPALA-7417. Moves FromUnixTime to
allow inlining, and switches to using UTCPTR in the benchmark - as
IMPALA-9385 did in most other code - to restore benchmark results.
Testing:
- Adds JVM conversion method to convert-timestamp-benchmark.
- Adds tests for several cases from Hive conversion tests.
Change-Id: I1271ed1da0b74366ab8315e7ec2d4ee47111e067
Reviewed-on: http://gerrit.cloudera.org:8080/22293
Tested-by: Impala Public Jenkins <[email protected]>
Reviewed-by: Csaba Ringhofer <[email protected]>
---
be/src/benchmarks/convert-timestamp-benchmark.cc | 144 ++++++++++++---------
be/src/benchmarks/date-benchmark.cc | 2 +
be/src/exec/parquet/hdfs-parquet-scanner.cc | 74 ++++++++---
be/src/exec/parquet/hdfs-parquet-scanner.h | 27 ++--
be/src/exec/parquet/parquet-common.cc | 23 +++-
be/src/exec/parquet/parquet-common.h | 13 +-
be/src/runtime/raw-value-test.cc | 2 +
be/src/runtime/timestamp-value.cc | 39 ++++--
be/src/runtime/timestamp-value.h | 5 +
be/src/runtime/timestamp-value.inline.h | 9 ++
be/src/service/frontend.cc | 12 +-
be/src/service/frontend.h | 5 +
be/src/service/query-options.cc | 4 +
be/src/service/query-options.h | 4 +-
common/thrift/Frontend.thrift | 14 ++
common/thrift/ImpalaService.thrift | 9 ++
common/thrift/Query.thrift | 3 +
docs/topics/impala_timestamp.xml | 37 ++++++
.../org/apache/impala/service/JniFrontend.java | 27 ++++
.../data/employee_hive_3_1_3_us_pacific.parquet | Bin 0 -> 446 bytes
testdata/data/hive_kuala_lumpur_legacy.parquet | Bin 0 -> 507 bytes
testdata/data/tbl_parq1/000000_0 | Bin 0 -> 286 bytes
testdata/data/tbl_parq1/000000_1 | Bin 0 -> 286 bytes
testdata/data/tbl_parq1/000000_2 | Bin 0 -> 327 bytes
.../QueryTest/timestamp-conversion-hive-313.test | 46 +++++++
.../QueryTest/timestamp-conversion-hive-3m.test | 34 +++++
.../QueryTest/timestamp-conversion-hive-4.test | 28 ++++
tests/query_test/test_hive_timestamp_conversion.py | 76 +++++++++++
28 files changed, 530 insertions(+), 107 deletions(-)
diff --git a/be/src/benchmarks/convert-timestamp-benchmark.cc
b/be/src/benchmarks/convert-timestamp-benchmark.cc
index a39829ab7..18b14fd47 100644
--- a/be/src/benchmarks/convert-timestamp-benchmark.cc
+++ b/be/src/benchmarks/convert-timestamp-benchmark.cc
@@ -37,8 +37,11 @@
#include "exprs/timezone_db.h"
#include "exprs/timestamp-functions.h"
#include "runtime/datetime-simple-date-format-parser.h"
+#include "runtime/test-env.h"
#include "runtime/timestamp-value.h"
#include "runtime/timestamp-value.inline.h"
+#include "service/fe-support.h"
+#include "service/frontend.h"
#include "util/benchmark.h"
#include "util/cpu-info.h"
#include "util/pretty-printer.h"
@@ -49,118 +52,120 @@
using std::random_device;
using std::mt19937;
using std::uniform_int_distribution;
-using std::thread;
using namespace impala;
using namespace datetime_parse_util;
// Benchmark tests for timestamp time-zone conversions
/*
-Machine Info: Intel(R) Core(TM) i5-6600 CPU @ 3.30GHz
+Machine Info: 12th Gen Intel(R) Core(TM) i9-12900K
UtcToUnixTime: Function iters/ms 10%ile 50%ile 90%ile
10%ile 50%ile 90%ile
(relative) (relative) (relative)
---------------------------------------------------------------------------------------------------------
- (glibc) 7.98 8.14 8.3
1X 1X 1X
- (Google/CCTZ) 17.9 18.2 18.5
2.24X 2.24X 2.23X
- (boost) 301 306 311
37.7X 37.5X 37.5X
+ (glibc) 12 12.1 12.2
1X 1X 1X
+ (Google/CCTZ) 25.2 25.5 25.5
2.1X 2.11X 2.09X
+ (boost) 635 643 646
53X 53.3X 52.9X
LocalToUnixTime: Function iters/ms 10%ile 50%ile 90%ile
10%ile 50%ile 90%ile
(relative) (relative) (relative)
---------------------------------------------------------------------------------------------------------
- (glibc) 0.717 0.732 0.745
1X 1X 1X
- (Google/CCTZ) 15.3 15.5 15.8
21.3X 21.2X 21.2X
+ (glibc) 0.739 0.745 0.745
1X 1X 1X
+ (Google/CCTZ) 23.2 23.4 23.7
31.4X 31.4X 31.7X
FromUtc: Function iters/ms 10%ile 50%ile 90%ile
10%ile 50%ile 90%ile
(relative) (relative) (relative)
---------------------------------------------------------------------------------------------------------
- (boost) 1.6 1.63 1.67
1X 1X 1X
- (Google/CCTZ) 14.5 14.8 15.2
9.06X 9.09X 9.11X
+ (boost) 2.59 2.6 2.6
1X 1X 1X
+ (Google/CCTZ) 24.4 24.7 24.9
9.45X 9.5X 9.58X
ToUtc: Function iters/ms 10%ile 50%ile 90%ile
10%ile 50%ile 90%ile
(relative) (relative) (relative)
---------------------------------------------------------------------------------------------------------
- (boost) 1.63 1.67 1.68
1X 1X 1X
- (Google/CCTZ) 8.7 8.9 9.05
5.34X 5.34X 5.38X
+ (boost) 2.6 2.64 2.65
1X 1X 1X
+ (Google/CCTZ) 13.5 13.6 13.7
5.2X 5.16X 5.18X
UtcToLocal: Function iters/ms 10%ile 50%ile 90%ile
10%ile 50%ile 90%ile
(relative) (relative) (relative)
---------------------------------------------------------------------------------------------------------
- (glibc) 2.68 2.75 2.8
1X 1X 1X
- (Google/CCTZ) 15 15.2 15.5
5.59X 5.55X 5.53X
+ (glibc) 4.42 4.42 4.44
1X 1X 1X
+ (Google/CCTZ) 25.6 26.6 27
5.78X 6.01X 6.08X
+ (JVM) 0.511 0.596 0.6
0.115X 0.135X 0.135X
UnixTimeToLocalPtime: Function iters/ms 10%ile 50%ile 90%ile
10%ile 50%ile 90%ile
(relative) (relative) (relative)
---------------------------------------------------------------------------------------------------------
- (glibc) 2.69 2.75 2.8
1X 1X 1X
- (Google/CCTZ) 14.8 15.1 15.4
5.5X 5.49X 5.52X
+ (glibc) 4.51 4.53 4.53
1X 1X 1X
+ (Google/CCTZ) 23.7 24.1 24.4
5.25X 5.32X 5.4X
UnixTimeToUtcPtime: Function iters/ms 10%ile 50%ile 90%ile
10%ile 50%ile 90%ile
(relative) (relative) (relative)
---------------------------------------------------------------------------------------------------------
- (glibc) 17 17.6 17.9
1X 1X 1X
- (Google/CCTZ) 6.45 6.71 6.81
0.379X 0.382X 0.38X
- (fast path) 25.1 26 26.4
1.47X 1.48X 1.48X
- (day split) 48.6 50.3 51.3
2.85X 2.87X 2.86X
+ (glibc) 24.8 25.1 25.6
1X 1X 1X
+ (Google/CCTZ) 13.9 14 14.1
0.562X 0.557X 0.553X
+ (fast path) 54.3 54.8 55.4
2.19X 2.18X 2.17X
+ (day split) 196 199 200
7.89X 7.92X 7.81X
UtcFromUnixTimeMicros: Function iters/ms 10%ile 50%ile 90%ile
10%ile 50%ile 90%ile
(relative) (relative) (relative)
---------------------------------------------------------------------------------------------------------
- (sec split (old)) 17.9 18.7 19.1
1X 1X 1X
- (day split) 111 116 118
6.21X 6.19X 6.19X
+ (sec split (old)) 30.1 31 31.7
1X 1X 1X
+ (day split) 526 532 534
17.5X 17.2X 16.9X
FromUnixTimeNanos: Function iters/ms 10%ile 50%ile 90%ile
10%ile 50%ile 90%ile
(relative) (relative) (relative)
---------------------------------------------------------------------------------------------------------
- (sec split (old)) 18.7 19.5 19.8
1X 1X 1X
- (sec split (new)) 104 108 110
5.58X 5.55X 5.57X
+ (sec split (old)) 36.8 37.5 39
1X 1X 1X
+ (sec split (new)) 319 323 324
8.68X 8.61X 8.33X
FromSubsecondUnixTime: Function iters/ms 10%ile 50%ile 90%ile
10%ile 50%ile 90%ile
(relative) (relative) (relative)
---------------------------------------------------------------------------------------------------------
- (old) 18.7 18.7 18.7
1X 1X 1X
- (new) 73.5 74.1 74.1
3.94X 3.96X 3.96X
+ (old) 37.1 38 38.7
1X 1X 1X
+ (new) 175 175 177
4.71X 4.6X 4.59X
Number of threads: 8
UtcToUnixTime:
- (glibc) elapsed time: 1s020ms
- (Google/CCTZ) elapsed time: 144ms
- (boost) elapsed time: 10ms
-cctz speedup: 7.0784
-boost speedup: 95.1732
+ (glibc) elapsed time: 793ms
+ (Google/CCTZ) elapsed time: 152ms
+ (boost) elapsed time: 3ms
+cctz speedup: 5.20977
+boost speedup: 260.517
LocalToUnixTime:
- (glibc) elapsed time: 18s050ms
- (Google/CCTZ) elapsed time: 212ms
-speedup: 84.9949
+ (glibc) elapsed time: 21s750ms
+ (Google/CCTZ) elapsed time: 242ms
+speedup: 89.6388
FromUtc:
- (boost) elapsed time: 1s519ms
- (Google/CCTZ) elapsed time: 263ms
-speedup: 5.77003
+ (boost) elapsed time: 1s298ms
+ (Google/CCTZ) elapsed time: 279ms
+speedup: 4.64642
ToUtc:
- (boost) elapsed time: 1s674ms
- (Google/CCTZ) elapsed time: 325ms
-speedup: 5.13874
+ (boost) elapsed time: 1s436ms
+ (Google/CCTZ) elapsed time: 496ms
+speedup: 2.89263
UtcToLocal:
- (glibc) elapsed time: 4s862ms
- (Google/CCTZ) elapsed time: 263ms
-speedup: 18.4253
+ (glibc) elapsed time: 4s565ms
+ (Google/CCTZ) elapsed time: 260ms
+ (JVM) elapsed time: 2s507ms
+cctz speedup: 17.5058
+jvm speedup: 1.82038
UnixTimeToLocalPtime:
- (glibc) elapsed time: 4s856ms
- (Google/CCTZ) elapsed time: 259ms
-speedup: 18.7398
+ (glibc) elapsed time: 4s576ms
+ (Google/CCTZ) elapsed time: 268ms
+speedup: 17.0547
UnixTimeToUtcPtime:
- (glibc) elapsed time: 928ms
- (Google/CCTZ) elapsed time: 282ms
- (fast path) elapsed time: 90ms
-cctz speedup: 3.28187
-fast path speedup: 10.2951
+ (glibc) elapsed time: 478ms
+ (Google/CCTZ) elapsed time: 123ms
+ (fast path) elapsed time: 25ms
+cctz speedup: 3.87304
+fast path speedup: 18.9835
*/
vector<TimestampValue> AddTestDataDateTimes(int n, const string& startstr) {
@@ -223,11 +228,11 @@ public:
}
// Create and start threads.
- vector<unique_ptr<thread>> threads(num_of_threads);
+ vector<unique_ptr<std::thread>> threads(num_of_threads);
StopWatch sw;
sw.Start();
for (int i = 0; i < num_of_threads; ++i) {
- threads[i] = make_unique<thread>(
+ threads[i] = make_unique<std::thread>(
run_test, batch_size, test_data[i].get());
}
@@ -403,6 +408,13 @@ TimestampValue cctz_utc_to_local(const TimestampValue&
ts_value) {
return TimestampValue(d, t);
}
+// JVM
+TimestampValue jvm_utc_to_local(const TimestampValue& ts_value) {
+ TimestampValue result = ts_value;
+ result.HiveLegacyUtcToLocal(*PTR_CCTZ_LOCAL_TZ);
+ return result;
+}
+
//
// Test FromUtc (CCTZ is expected to be faster than boost)
@@ -603,8 +615,7 @@ TimestampValue old_split_utc_from_unix_time_nanos(const
SplitNanoAndSecond& unix
TimestampValue new_split_utc_from_unix_time_nanos(const SplitNanoAndSecond&
unix_time) {
// The TimestampValue version is used as it is hard to reproduce the same
logic without
// accessing private members.
- return TimestampValue::FromUnixTimeNanos(unix_time.seconds, unix_time.nanos,
- &TimezoneDatabase::GetUtcTimezone());
+ return TimestampValue::FromUnixTimeNanos(unix_time.seconds, unix_time.nanos,
UTCPTR);
}
TimestampValue from_subsecond_unix_time_old(const double& unix_time) {
@@ -621,8 +632,7 @@ TimestampValue from_subsecond_unix_time_new(const double&
unix_time) {
const double ONE_BILLIONTH = 0.000000001;
int64_t unix_time_whole = unix_time;
int64_t nanos = (unix_time - unix_time_whole) / ONE_BILLIONTH;
- return TimestampValue::FromUnixTimeNanos(
- unix_time_whole, nanos, &TimezoneDatabase::GetUtcTimezone());
+ return TimestampValue::FromUnixTimeNanos(unix_time_whole, nanos, UTCPTR);
}
//
@@ -698,7 +708,11 @@ TimestampVal cctz_to_utc(const TimestampVal& ts_val) {
int main(int argc, char* argv[]) {
- CpuInfo::Init();
+ impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+ impala::InitFeSupport();
+ TestEnv test_env;
+ CHECK(test_env.Init().ok());
+
cout << Benchmark::GetMachineInfo() << endl;
ABORT_IF_ERROR(TimezoneDatabase::Initialize());
@@ -783,9 +797,12 @@ int main(int argc, char* argv[]) {
tsvalue_data;
TestData<TimestampValue, TimestampValue, cctz_utc_to_local>
cctz_utc_to_local_data =
tsvalue_data;
+ TestData<TimestampValue, TimestampValue, jvm_utc_to_local>
jvm_utc_to_local_data =
+ tsvalue_data;
glibc_utc_to_local_data.add_to_benchmark(bm_utc_to_local, "(glibc)");
cctz_utc_to_local_data.add_to_benchmark(bm_utc_to_local, "(Google/CCTZ)");
+ jvm_utc_to_local_data.add_to_benchmark(bm_utc_to_local, "(JVM)");
cout << bm_utc_to_local.Measure() << endl;
bail_if_results_dont_match(vector<const vector<TimestampValue>*>{
@@ -795,7 +812,7 @@ int main(int argc, char* argv[]) {
vector<time_t> time_data;
for (const TimestampValue& tsvalue: tsvalue_data) {
time_t unix_time;
- tsvalue.ToUnixTime(&TimezoneDatabase::GetUtcTimezone(), &unix_time);
+ tsvalue.ToUnixTime(UTCPTR, &unix_time);
time_data.push_back(unix_time);
}
@@ -859,7 +876,7 @@ int main(int argc, char* argv[]) {
for (int i = 0; i < tsvalue_data.size(); ++i) {
const TimestampValue& tsvalue = tsvalue_data[i];
time_t unix_time;
- tsvalue.ToUnixTime(&TimezoneDatabase::GetUtcTimezone(), &unix_time);
+ tsvalue.ToUnixTime(UTCPTR, &unix_time);
int micros = (i * 1001) % MICROS_PER_SEC; // add some sub-second part
microsec_data.push_back(unix_time * MICROS_PER_SEC + micros);
}
@@ -885,7 +902,7 @@ int main(int argc, char* argv[]) {
for (int i = 0; i < tsvalue_data.size(); ++i) {
const TimestampValue& tsvalue = tsvalue_data[i];
time_t unix_time;
- tsvalue.ToUnixTime(&TimezoneDatabase::GetUtcTimezone(), &unix_time);
+ tsvalue.ToUnixTime(UTCPTR, &unix_time);
int nanos = (i * 1001) % NANOS_PER_SEC; // add some sub-second part
nanosec_data.push_back(SplitNanoAndSecond {unix_time, nanos} );
}
@@ -911,7 +928,7 @@ int main(int argc, char* argv[]) {
for (int i = 0; i < tsvalue_data.size(); ++i) {
const TimestampValue& tsvalue = tsvalue_data[i];
time_t unix_time;
- tsvalue.ToUnixTime(&TimezoneDatabase::GetUtcTimezone(), &unix_time);
+ tsvalue.ToUnixTime(UTCPTR, &unix_time);
double nanos = (i * 1001) % NANOS_PER_SEC; // add some sub-second part
double_data.push_back((double)unix_time + nanos / NANOS_PER_SEC);
}
@@ -979,7 +996,10 @@ int main(int argc, char* argv[]) {
num_of_threads, BATCH_SIZE, tsvalue_data, "(glibc)");
m2 = cctz_utc_to_local_data.measure_multithreaded_elapsed_time(
num_of_threads, BATCH_SIZE, tsvalue_data, "(Google/CCTZ)");
- cout << "speedup: " << double(m1)/double(m2) << endl;
+ m3 = jvm_utc_to_local_data.measure_multithreaded_elapsed_time(
+ num_of_threads, BATCH_SIZE, tsvalue_data, "(JVM)");
+ cout << "cctz speedup: " << double(m1)/double(m2) << endl;
+ cout << "jvm speedup: " << double(m1)/double(m3) << endl;
// UnixTimeToLocalPtime
cout << endl << "UnixTimeToLocalPtime:" << endl;
diff --git a/be/src/benchmarks/date-benchmark.cc
b/be/src/benchmarks/date-benchmark.cc
index 84ccd6324..cca4b0bb0 100644
--- a/be/src/benchmarks/date-benchmark.cc
+++ b/be/src/benchmarks/date-benchmark.cc
@@ -26,6 +26,8 @@
#include "runtime/datetime-simple-date-format-parser.h"
#include "runtime/date-parse-util.h"
#include "runtime/timestamp-parse-util.h"
+#include "runtime/timestamp-value.h"
+#include "runtime/timestamp-value.inline.h"
#include "util/benchmark.h"
#include "util/cpu-info.h"
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc
b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index 9e33ff360..14aec5520 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -21,6 +21,8 @@
#include <queue>
#include <stack>
+#include <boost/algorithm/string.hpp>
+#include <boost/range/adaptor/transformed.hpp>
#include <gflags/gflags.h>
#include <gutil/strings/substitute.h>
@@ -53,6 +55,9 @@
#include "common/names.h"
+using boost::adaptors::transformed;
+using boost::algorithm::iequals;
+using boost::algorithm::join;
using std::move;
using std::sort;
using namespace impala;
@@ -564,14 +569,13 @@ Status
HdfsParquetScanner::ResolveSchemaForStatFiltering(SlotDescriptor* slot_de
}
ColumnStatsReader HdfsParquetScanner::CreateStatsReader(
- const parquet::FileMetaData& file_metadata, const parquet::RowGroup&
row_group,
- SchemaNode* node, const ColumnType& col_type) {
+ const parquet::RowGroup& row_group, SchemaNode* node, const ColumnType&
col_type) {
DCHECK(node);
int col_idx = node->col_idx;
DCHECK_LT(col_idx, row_group.columns.size());
- const vector<parquet::ColumnOrder>& col_orders = file_metadata.column_orders;
+ const vector<parquet::ColumnOrder>& col_orders =
file_metadata_.column_orders;
const parquet::ColumnOrder* col_order =
col_idx < col_orders.size() ? &col_orders[col_idx] : nullptr;
@@ -585,8 +589,7 @@ ColumnStatsReader HdfsParquetScanner::CreateStatsReader(
return stat_reader;
}
-Status HdfsParquetScanner::EvaluateStatsConjuncts(
- const parquet::FileMetaData& file_metadata, const parquet::RowGroup&
row_group,
+Status HdfsParquetScanner::EvaluateStatsConjuncts(const parquet::RowGroup&
row_group,
bool* skip_row_group) {
*skip_row_group = false;
@@ -623,7 +626,7 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(
}
ColumnStatsReader stats_reader =
- CreateStatsReader(file_metadata, row_group, node, slot_desc->type());
+ CreateStatsReader(row_group, node, slot_desc->type());
bool all_nulls = false;
if (stats_reader.AllNulls(&all_nulls) && all_nulls) {
@@ -686,8 +689,7 @@ bool
HdfsParquetScanner::FilterAlreadyDisabledOrOverlapWithColumnStats(
}
Status HdfsParquetScanner::EvaluateOverlapForRowGroup(
- const parquet::FileMetaData& file_metadata, const parquet::RowGroup&
row_group,
- bool* skip_row_group) {
+ const parquet::RowGroup& row_group, bool* skip_row_group) {
*skip_row_group = false;
if (!state_->query_options().parquet_read_statistics) return Status::OK();
@@ -764,7 +766,7 @@ Status HdfsParquetScanner::EvaluateOverlapForRowGroup(
break;
}
ColumnStatsReader stats_reader =
- CreateStatsReader(file_metadata, row_group, node, slot_desc->type());
+ CreateStatsReader(row_group, node, slot_desc->type());
bool all_nulls = false;
if (stats_reader.AllNulls(&all_nulls) && all_nulls) {
@@ -926,8 +928,7 @@ Status HdfsParquetScanner::NextRowGroup() {
// Evaluate row group statistics with stats conjuncts.
bool skip_row_group_on_stats;
- RETURN_IF_ERROR(
- EvaluateStatsConjuncts(file_metadata_, row_group,
&skip_row_group_on_stats));
+ RETURN_IF_ERROR(EvaluateStatsConjuncts(row_group,
&skip_row_group_on_stats));
if (skip_row_group_on_stats) {
COUNTER_ADD(num_stats_filtered_row_groups_counter_, 1);
continue;
@@ -935,8 +936,7 @@ Status HdfsParquetScanner::NextRowGroup() {
// Evaluate row group statistics with min/max filters.
bool skip_row_group_on_minmax;
- RETURN_IF_ERROR(
- EvaluateOverlapForRowGroup(file_metadata_, row_group,
&skip_row_group_on_minmax));
+ RETURN_IF_ERROR(EvaluateOverlapForRowGroup(row_group,
&skip_row_group_on_minmax));
if (skip_row_group_on_minmax) {
COUNTER_ADD(num_minmax_filtered_row_groups_counter_, 1);
continue;
@@ -1487,7 +1487,7 @@ Status
HdfsParquetScanner::FindSkipRangesForPagesWithMinMaxFilters(
}
ColumnStatsReader stats_reader =
- CreateStatsReader(file_metadata_, row_group, node, slot_desc->type());
+ CreateStatsReader(row_group, node, slot_desc->type());
DCHECK_LT(col_idx, row_group.columns.size());
const parquet::ColumnChunk& col_chunk = row_group.columns[col_idx];
@@ -1570,7 +1570,7 @@ Status HdfsParquetScanner::EvaluatePageIndex() {
}
int col_idx = node->col_idx;;
ColumnStatsReader stats_reader =
- CreateStatsReader(file_metadata_, row_group, node, slot_desc->type());
+ CreateStatsReader(row_group, node, slot_desc->type());
DCHECK_LT(col_idx, row_group.columns.size());
const parquet::ColumnChunk& col_chunk = row_group.columns[col_idx];
@@ -2826,6 +2826,13 @@ Status HdfsParquetScanner::ProcessFooter() {
RETURN_IF_ERROR(ParquetMetadataUtils::ValidateFileVersion(file_metadata_,
filename()));
+ if (VLOG_FILE_IS_ON) {
+ VLOG_FILE << "Parquet metadata for " << filename() << " created by "
+ << file_metadata_.created_by << ":\n"
+ << join(file_metadata_.key_value_metadata | transformed(
+ [](parquet::KeyValue kv) { return kv.key + "=" + kv.value;
}), "\n");
+ }
+
// IMPALA-3943: Do not throw an error for empty files for backwards
compatibility.
if (file_metadata_.num_rows == 0) {
// Warn if the num_rows is inconsistent with the row group metadata.
@@ -3157,8 +3164,41 @@ ParquetTimestampDecoder
HdfsParquetScanner::CreateTimestampDecoder(
state_->query_options().convert_legacy_hive_parquet_utc_timestamps &&
state_->local_time_zone() != UTCPTR;
- return ParquetTimestampDecoder(element, state_->local_time_zone(),
- timestamp_conversion_needed_for_int96_timestamps);
+ const Timezone* timezone = state_->local_time_zone();
+ bool hive_legacy_conversion = false;
+ if (timestamp_conversion_needed_for_int96_timestamps &&
GetHiveZoneConversionLegacy()) {
+ VLOG_FILE << "Using Hive legacy timezone conversion";
+ hive_legacy_conversion = true;
+ }
+
+ return ParquetTimestampDecoder(element, timezone,
+ timestamp_conversion_needed_for_int96_timestamps,
hive_legacy_conversion);
+}
+
+bool HdfsParquetScanner::GetHiveZoneConversionLegacy() const {
+ string writer_zone_conversion_legacy;
+ string writer_time_zone;
+ for (const parquet::KeyValue& kv : file_metadata_.key_value_metadata) {
+ if (kv.key == "writer.zone.conversion.legacy") {
+ writer_zone_conversion_legacy = kv.value;
+ } else if (kv.key == "writer.time.zone") {
+ writer_time_zone = kv.value;
+ }
+ }
+
+ if (writer_zone_conversion_legacy != "") {
+ return iequals(writer_zone_conversion_legacy, "true");
+ }
+
+ // There are no explicit meta about the legacy conversion.
+ if (writer_time_zone != "") {
+ // There is meta about the timezone thus we can infer that when the file
was written,
+ // the new APIs were used.
+ return false;
+ }
+
+ // There is no (relevant) metadata in the file, use the configuration.
+ return state_->query_options().use_legacy_hive_timestamp_conversion;
}
void HdfsParquetScanner::UpdateCompressedPageSizeCounter(int64_t
compressed_page_size) {
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.h
b/be/src/exec/parquet/hdfs-parquet-scanner.h
index f3c8b51e1..42cfd7d2e 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.h
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.h
@@ -587,10 +587,9 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
}
/// Evaluates the min/max predicates of the 'scan_node_' using the
parquet::Statistics
- /// of 'row_group'. 'file_metadata' is used to determine the ordering that
was used to
- /// compute the statistics. Sets 'skip_row_group' to true if the row group
can be
- /// skipped, 'false' otherwise.
- Status EvaluateStatsConjuncts(const parquet::FileMetaData& file_metadata,
+ /// of 'row_group'. Sets 'skip_row_group' to true if the row group can be
skipped,
+ /// 'false' otherwise.
+ Status EvaluateStatsConjuncts(
const parquet::RowGroup& row_group, bool* skip_row_group)
WARN_UNUSED_RESULT;
/// Advances 'row_group_idx_' to the next non-empty row group and initializes
@@ -601,12 +600,10 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
Status NextRowGroup() WARN_UNUSED_RESULT;
/// Evaluates the overlap predicates of the 'scan_node_' using the
parquet::Statistics
- /// of 'row_group'. 'file_metadata' is used to determine the ordering that
was used to
- /// compute the statistics. Sets 'skip_row_group' to true if the row group
can be
- /// skipped, 'false' otherwise.
+ /// of 'row_group'. Sets 'skip_row_group' to true if the row group can be
skipped,
+ /// 'false' otherwise.
Status EvaluateOverlapForRowGroup(
- const parquet::FileMetaData& file_metadata, const parquet::RowGroup&
row_group,
- bool* skip_row_group);
+ const parquet::RowGroup& row_group, bool* skip_row_group);
/// Return true if filter 'minmax_filter' of fitler id 'filter_id' is too
close to
/// column min/max stats available at the target desc entry targets[0] in
@@ -631,12 +628,12 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
SchemaNode** schema_node_ptr = nullptr);
/// Create a ColumnStatsReader object for a column chunk described by a
schema
- /// path in a slot descriptor 'slot_desc'. 'file_metadata', 'row_group',
'node',
- /// and 'col_type' provide extra data needed.
+ /// path in a slot descriptor 'slot_desc'. 'row_group', 'node', and
'col_type' provide
+ /// extra data needed.
/// On return:
/// A column chunk stats reader ('ColumnStatsReader') is returned.
- ColumnStatsReader CreateStatsReader(const parquet::FileMetaData&
file_metadata,
- const parquet::RowGroup& row_group, SchemaNode* node, const ColumnType&
col_type);
+ ColumnStatsReader CreateStatsReader(const parquet::RowGroup& row_group,
+ SchemaNode* node, const ColumnType& col_type);
/// Return the overlap predicate descs from the HDFS scan plan.
const vector<TOverlapPredicateDesc>& GetOverlapPredicateDescs();
@@ -944,6 +941,10 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
/// then we skip to row index 'skip_to_row'.
Status SkipRowsForColumns(const vector<ParquetColumnReader*>& column_readers,
int64_t* num_rows_to_skip, int64_t* skip_to_row);
+
+ /// Returns whether Hive legacy zone conversion should be used for
transforming
+ /// timestamps based on file metadata and configuration.
+ bool GetHiveZoneConversionLegacy() const;
};
} // namespace impala
diff --git a/be/src/exec/parquet/parquet-common.cc
b/be/src/exec/parquet/parquet-common.cc
index 34133ea4d..ee284e4b3 100644
--- a/be/src/exec/parquet/parquet-common.cc
+++ b/be/src/exec/parquet/parquet-common.cc
@@ -255,7 +255,8 @@ bool
ParquetTimestampDecoder::GetTimestampInfoFromSchema(const parquet::SchemaEl
}
ParquetTimestampDecoder::ParquetTimestampDecoder(const parquet::SchemaElement&
e,
- const Timezone* timezone, bool convert_int96_timestamps) {
+ const Timezone* timezone, bool convert_int96_timestamps,
+ bool hive_legacy_conversion) :
hive_legacy_conversion_(hive_legacy_conversion) {
bool needs_conversion = false;
bool valid_schema = GetTimestampInfoFromSchema(e, precision_,
needs_conversion);
DCHECK(valid_schema); // Invalid schemas should be rejected in an earlier
step.
@@ -267,7 +268,15 @@ void
ParquetTimestampDecoder::ConvertMinStatToLocalTime(TimestampValue* v) const
DCHECK(timezone_ != nullptr);
if (!v->HasDateAndTime()) return;
TimestampValue repeated_period_start;
- v->UtcToLocal(*timezone_, &repeated_period_start);
+ if (hive_legacy_conversion_) {
+ // Hive legacy conversion does not have efficient tools for identifying
repeated
+ // periods, so subtract a day to ensure we cover all possible repeated
periods
+ // (such as switching from UTC- to UTC+ near the international date line).
+ v->HiveLegacyUtcToLocal(*timezone_);
+ v->Subtract(boost::posix_time::hours(24));
+ } else {
+ v->UtcToLocal(*timezone_, &repeated_period_start);
+ }
if (repeated_period_start.HasDateAndTime()) *v = repeated_period_start;
}
@@ -275,7 +284,15 @@ void
ParquetTimestampDecoder::ConvertMaxStatToLocalTime(TimestampValue* v) const
DCHECK(timezone_ != nullptr);
if (!v->HasDateAndTime()) return;
TimestampValue repeated_period_end;
- v->UtcToLocal(*timezone_, nullptr, &repeated_period_end);
+ if (hive_legacy_conversion_) {
+ // Hive legacy conversion does not have efficient tools for identifying
repeated
+ // periods, so add a day to ensure we cover all possible repeated periods
+ // (such as switching from UTC- to UTC+ near the international date line).
+ v->HiveLegacyUtcToLocal(*timezone_);
+ v->Add(boost::posix_time::hours(24));
+ } else {
+ v->UtcToLocal(*timezone_, nullptr, &repeated_period_end);
+ }
if (repeated_period_end.HasDateAndTime()) *v = repeated_period_end;
}
}
diff --git a/be/src/exec/parquet/parquet-common.h
b/be/src/exec/parquet/parquet-common.h
index 2b215e150..72ad18015 100644
--- a/be/src/exec/parquet/parquet-common.h
+++ b/be/src/exec/parquet/parquet-common.h
@@ -764,7 +764,7 @@ public:
ParquetTimestampDecoder() {}
ParquetTimestampDecoder( const parquet::SchemaElement& e, const Timezone*
timezone,
- bool convert_int96_timestamps);
+ bool convert_int96_timestamps, bool hive_legacy_conversion);
bool NeedsConversion() const { return timezone_ != nullptr; }
@@ -798,7 +798,13 @@ public:
void ConvertToLocalTime(TimestampValue* v) const {
DCHECK(timezone_ != nullptr);
- if (v->HasDateAndTime()) v->UtcToLocal(*timezone_);
+ if (v->HasDateAndTime()) {
+ if (hive_legacy_conversion_) {
+ v->HiveLegacyUtcToLocal(*timezone_);
+ } else {
+ v->UtcToLocal(*timezone_);
+ }
+ }
}
/// Timezone conversion of min/max stats need some extra logic because
UTC->local
@@ -831,6 +837,9 @@ private:
/// INT64 decoding. INT64 with nanosecond precision (and reduced range) is
also planned
/// to be implemented once it is added in Parquet (PARQUET-1387).
Precision precision_ = NANO;
+
+ /// Use Hive legacy-compatible conversion with Java DateFormat.
+ bool hive_legacy_conversion_ = false;
};
template <>
diff --git a/be/src/runtime/raw-value-test.cc b/be/src/runtime/raw-value-test.cc
index 090178f1e..066adca69 100644
--- a/be/src/runtime/raw-value-test.cc
+++ b/be/src/runtime/raw-value-test.cc
@@ -19,6 +19,8 @@
#include "exprs/timezone_db.h"
#include "runtime/raw-value.inline.h"
+#include "runtime/timestamp-value.h"
+#include "runtime/timestamp-value.inline.h"
#include "testutil/gtest-util.h"
#include "common/names.h"
diff --git a/be/src/runtime/timestamp-value.cc
b/be/src/runtime/timestamp-value.cc
index 417ef467d..9429704b8 100644
--- a/be/src/runtime/timestamp-value.cc
+++ b/be/src/runtime/timestamp-value.cc
@@ -17,14 +17,19 @@
#include "runtime/timestamp-value.h"
+#include <boost/algorithm/string/predicate.hpp>
+
#include "exprs/timestamp-functions.h"
#include "exprs/timezone_db.h"
#include "runtime/datetime-simple-date-format-parser.h"
+#include "runtime/exec-env.h"
#include "runtime/timestamp-parse-util.h"
#include "runtime/timestamp-value.inline.h"
+#include "service/frontend.h"
#include "common/names.h"
+using boost::algorithm::starts_with;
using boost::date_time::not_a_date_time;
using boost::gregorian::date;
using boost::gregorian::date_duration;
@@ -155,6 +160,32 @@ void TimestampValue::UtcToLocal(const Timezone& local_tz,
}
}
+void TimestampValue::HiveLegacyUtcToLocal(const Timezone& local_tz) {
+ DCHECK(HasDateAndTime());
+ int64_t utc_time_millis;
+ if (UNLIKELY(!FloorUtcToUnixTimeMillis(&utc_time_millis))) {
+ SetToInvalidDateTime();
+ return;
+ }
+
+ string tz = local_tz.name();
+ static constexpr std::string_view zoneinfo = "/usr/share/zoneinfo/";
+ if (starts_with(tz, zoneinfo)) {
+ tz = tz.substr(zoneinfo.size());
+ }
+
+ TCivilTime cs;
+ Status status =
ExecEnv::GetInstance()->frontend()->HiveLegacyTimezoneConvert(
+ tz, utc_time_millis, &cs);
+ if (UNLIKELY(!status.ok())) {
+ // This would result in log spam. However it should be impossible to fail.
+ LOG(ERROR) << "Timezone " << tz << " cannot be used with legacy Hive
conversion.";
+ return;
+ }
+ date_ = boost::gregorian::date(cs.year, cs.month, cs.day);
+ time_ = time_duration(cs.hour, cs.minute, cs.second,
time_.fractional_seconds());
+}
+
void TimestampValue::LocalToUtc(const Timezone& local_tz,
TimestampValue* pre_utc_if_repeated, TimestampValue* post_utc_if_repeated)
{
DCHECK(HasDateAndTime());
@@ -227,14 +258,6 @@ TimestampValue TimestampValue::UnixTimeToLocal(
}
}
-TimestampValue TimestampValue::FromUnixTime(time_t unix_time, const Timezone*
local_tz) {
- if (local_tz != UTCPTR) {
- return UnixTimeToLocal(unix_time, *local_tz);
- } else {
- return UtcFromUnixTimeTicks<1>(unix_time);
- }
-}
-
void TimestampValue::ToString(string& dst) const {
dst.resize(SimpleDateFormatTokenizer::DEFAULT_DATE_TIME_FMT_LEN);
const int out_len = TimestampParser::FormatDefault(date(), time(),
dst.data());
diff --git a/be/src/runtime/timestamp-value.h b/be/src/runtime/timestamp-value.h
index f1fb126a8..02cfe7a62 100644
--- a/be/src/runtime/timestamp-value.h
+++ b/be/src/runtime/timestamp-value.h
@@ -296,6 +296,11 @@ class TimestampValue {
TimestampValue* start_of_repeated_period = nullptr,
TimestampValue* end_of_repeated_period = nullptr);
+ /// Converts from UTC to 'local_tz' time zone in-place. The caller must
ensure the
+ /// TimestampValue this function is called upon has both a valid date and
time. Uses
+ /// Java Calendar for conversion to match Hive's legacy conversion process.
+ void HiveLegacyUtcToLocal(const Timezone& local_tz);
+
/// Converts from 'local_tz' to UTC time zone in-place. The caller must
ensure the
/// TimestampValue this function is called upon has both a valid date and
time.
///
diff --git a/be/src/runtime/timestamp-value.inline.h
b/be/src/runtime/timestamp-value.inline.h
index d49dbb2ae..6cfc63b6d 100644
--- a/be/src/runtime/timestamp-value.inline.h
+++ b/be/src/runtime/timestamp-value.inline.h
@@ -77,6 +77,15 @@ inline TimestampValue
TimestampValue::UtcFromUnixTimeLimitedRangeNanos(
return UtcFromUnixTimeTicks<NANOS_PER_SEC>(unix_time_nanos);
}
+inline TimestampValue TimestampValue::FromUnixTime(time_t unix_time,
+ const Timezone* local_tz) {
+ if (local_tz != UTCPTR) {
+ return UnixTimeToLocal(unix_time, *local_tz);
+ } else {
+ return UtcFromUnixTimeTicks<1>(unix_time);
+ }
+}
+
inline TimestampValue TimestampValue::FromUnixTimeNanos(time_t unix_time,
int64_t nanos,
const Timezone* local_tz) {
unix_time =
diff --git a/be/src/service/frontend.cc b/be/src/service/frontend.cc
index aff23809f..22f3cdb77 100644
--- a/be/src/service/frontend.cc
+++ b/be/src/service/frontend.cc
@@ -157,7 +157,8 @@ Frontend::Frontend() {
};
JniMethodDescriptor staticMethods[] = {
- {"getSecretFromKeyStore", "([B)Ljava/lang/String;",
&get_secret_from_key_store_}
+ {"getSecretFromKeyStore", "([B)Ljava/lang/String;",
&get_secret_from_key_store_},
+ {"hiveLegacyTimezoneConvert", "([BJ)[B", &hive_legacy_timezone_convert_}
};
JNIEnv* jni_env = JniUtil::GetJNIEnv();
@@ -436,3 +437,12 @@ Status Frontend::GetSecretFromKeyStore(const string&
secret_key, string* secret)
return JniUtil::CallStaticJniMethod(fe_class_, get_secret_from_key_store_,
secret_key_t,
secret);
}
+
+Status Frontend::HiveLegacyTimezoneConvert(
+ const string& timezone, long utc_time_millis, TCivilTime* local_time) {
+ TStringLiteral timezone_t;
+ timezone_t.__set_value(timezone);
+ return JniCall::static_method(fe_class_, hive_legacy_timezone_convert_)
+ .with_thrift_arg(timezone_t).with_primitive_arg(utc_time_millis)
+ .Call(local_time);
+}
diff --git a/be/src/service/frontend.h b/be/src/service/frontend.h
index 823f98ca8..e2f0b551f 100644
--- a/be/src/service/frontend.h
+++ b/be/src/service/frontend.h
@@ -249,6 +249,10 @@ class Frontend {
/// Get secret from jceks key store for the input secret_key.
Status GetSecretFromKeyStore(const string& secret_key, string* secret);
+ /// Convert UTC UNIX time (in millis) to target timezone using Hive legacy
conversion.
+ Status HiveLegacyTimezoneConvert(
+ const string& timezone, long utc_time_millis, TCivilTime* local_time);
+
private:
jclass fe_class_; // org.apache.impala.service.JniFrontend class
jobject fe_; // instance of org.apache.impala.service.JniFrontend
@@ -292,6 +296,7 @@ class Frontend {
jmethodID commit_kudu_txn_; // JniFrontend.commitKuduTransaction()
jmethodID convertTable; // JniFrontend.convertTable
jmethodID get_secret_from_key_store_; // JniFrontend.getSecretFromKeyStore()
+ jmethodID hive_legacy_timezone_convert_; //
JniFrontend.hiveLegacyTimezoneConvert()
// Only used for testing.
jmethodID build_test_descriptor_table_id_; //
JniFrontend.buildTestDescriptorTable()
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 44d571a80..5da1c6eb8 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1330,6 +1330,10 @@ Status impala::SetQueryOption(TImpalaQueryOptions::type
option, const string& va
query_options->__set_estimate_duplicate_in_preagg(IsTrue(value));
break;
}
+ case TImpalaQueryOptions::USE_LEGACY_HIVE_TIMESTAMP_CONVERSION: {
+
query_options->__set_use_legacy_hive_timestamp_conversion(IsTrue(value));
+ break;
+ }
default:
string key = to_string(option);
if (IsRemovedQueryOption(key)) {
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index b5587ec2a..6ce06aea7 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -51,7 +51,7 @@ typedef std::unordered_map<string,
beeswax::TQueryOptionLevel::type>
// plus one. Thus, the second argument to the DCHECK has to be updated every
// time we add or remove a query option to/from the enum TImpalaQueryOptions.
constexpr unsigned NUM_QUERY_OPTIONS =
- TImpalaQueryOptions::ESTIMATE_DUPLICATE_IN_PREAGG + 1;
+ TImpalaQueryOptions::USE_LEGACY_HIVE_TIMESTAMP_CONVERSION + 1;
#define QUERY_OPTS_TABLE
\
DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(), NUM_QUERY_OPTIONS);
\
REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded,
ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \
@@ -364,6 +364,8 @@ constexpr unsigned NUM_QUERY_OPTIONS =
ENABLE_TUPLE_ANALYSIS_IN_AGGREGATE, TQueryOptionLevel::ADVANCED)
\
QUERY_OPT_FN(estimate_duplicate_in_preagg,
\
ESTIMATE_DUPLICATE_IN_PREAGG, TQueryOptionLevel::ADVANCED)
\
+ QUERY_OPT_FN(use_legacy_hive_timestamp_conversion,
\
+ USE_LEGACY_HIVE_TIMESTAMP_CONVERSION, TQueryOptionLevel::ADVANCED)
;
/// Enforce practical limits on some query options to avoid undesired query
state.
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index bd70374e9..15007ae2a 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -1092,3 +1092,17 @@ struct TWrappedHttpResponse {
5: optional string content
6: optional string content_type
}
+
+// Captures civil time - local time in a specific time zone - mirroring
+// cctz::civil_second. Used to serialize Java timezone conversions back to C++
code.
+// Omits subsecond measurements because
+// - matches cctz::civil_second; no known timezone libraries have subsecond
adjustments
+// - Java timezone conversion is only accurate to milliseconds, but we use
nanoseconds
+struct TCivilTime {
+ 1: required i32 year
+ 2: required i32 month
+ 3: required i32 day
+ 4: required i32 hour
+ 5: required i32 minute
+ 6: required i32 second
+}
diff --git a/common/thrift/ImpalaService.thrift
b/common/thrift/ImpalaService.thrift
index 90992a269..b28754848 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -977,6 +977,15 @@ enum TImpalaQueryOptions {
// If True, account for probability of having duplicate grouping key exist
in multiple
// nodes during preaggreation.
ESTIMATE_DUPLICATE_IN_PREAGG = 185
+
+ // When true and CONVERT_LEGACY_HIVE_PARQUET_UTC_TIMESTAMPS is also enabled,
TIMESTAMP
+ // conversion to local time will fallback to the timestamp conversion method
from Hive
+ // 3.0 and earlier if not specified in the file. This matches the Hive option
+ // 'hive.parquet.timestamp.legacy.conversion.enabled', which defaults to
true. Impala
+ // defaults to false because conversion is ~50x slower than Impala's default
conversion
+ // method and they produce the same results for modern time periods (post
1970, and in
+ // most instances before that).
+ USE_LEGACY_HIVE_TIMESTAMP_CONVERSION = 186
}
// The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index 851f5a573..0e8458082 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -759,6 +759,9 @@ struct TQueryOptions {
// See comment in ImpalaService.thrift
186: optional bool estimate_duplicate_in_preagg = true
+
+ // See comment in ImpalaService.thrift
+ 187: optional bool use_legacy_hive_timestamp_conversion = false;
}
// Impala currently has three types of sessions: Beeswax, HiveServer2 and
external
diff --git a/docs/topics/impala_timestamp.xml b/docs/topics/impala_timestamp.xml
index efa367e83..33e7d40ba 100644
--- a/docs/topics/impala_timestamp.xml
+++ b/docs/topics/impala_timestamp.xml
@@ -263,6 +263,43 @@ DATE_ADD (<varname>timestamp</varname>, INTERVAL
<varname>interval</varname> <va
Parquet files written by Hive.
</p>
+ <p>
+ Hive versions prior to 3.1 wrote Parquet files in local time using
Java's
+ SimpleDateFormat. That method has some cases that differ from both
Impala's
+ method and the default method used in Hive 3.1.2+ that are based on
the
+ <xref href="https://www.iana.org/time-zones" format="html"
scope="external">
+ IANA Time Zone Database</xref>. Hive 4 added the
+ <codeph>writer.zone.conversion.legacy</codeph> Parquet file metadata
property
+ to identify which method was used to write the file (controlled by
+
<codeph>hive.parquet.timestamp.write.legacy.conversion.enabled</codeph>). When
+ the Parquet file was written by Parquet Java
(<codeph>parquet-mr</codeph>), Hive -
+ and Impala's behavior when
+ <codeph>convert_legacy_hive_parquet_utc_timestamps</codeph> is
+ <codeph>true</codeph> - are:
+ <ul>
+ <li>
+ If <codeph>writer.zone.conversion.legacy</codeph> is present,
use the legacy
+ conversion method if true, use the newer method if false.
+ </li>
+ <li>
+ If <codeph>writer.zone.conversion.legacy</codeph> is not present
but
+ <codeph>writer.time.zone</codeph> is, we can infer the file was
written by
+ Hive 3.1.2+ using new APIs and use the newer method.
+ </li>
+ <li>
+ Otherwise assume it was written by an earlier Hive release. In
that case
+ Hive will select conversion method based on
+
<codeph>hive.parquet.timestamp.legacy.conversion.enabled</codeph> (defaults
+ to <codeph>true</codeph>). <keyword keyref="impala45"/> adds the
query
+ option <codeph>use_legacy_hive_timestamp_conversion</codeph> to
select this
+ behavior. It defaults to <codeph>false</codeph> because
conversion is ~50x
+ slower than Impala's default conversion method and they produce
the same
+ results for modern time periods (post 1970, and in most
instances before
+ that).
+ </li>
+ </ul>
+ </p>
+
<p>
Hive currently cannot write <codeph>INT64</codeph>
<codeph>TIMESTAMP</codeph> values.
</p>
diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java
b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
index 3a3e4d77f..c737d2f17 100644
--- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java
+++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
@@ -54,6 +54,7 @@ import org.apache.impala.service.Frontend.PlanCtx;
import org.apache.impala.thrift.TBackendGflags;
import org.apache.impala.thrift.TBuildTestDescriptorTableParams;
import org.apache.impala.thrift.TCatalogObject;
+import org.apache.impala.thrift.TCivilTime;
import org.apache.impala.thrift.TDatabase;
import org.apache.impala.thrift.TDescribeDbParams;
import org.apache.impala.thrift.TDescribeResult;
@@ -115,11 +116,14 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.lang.IllegalArgumentException;
+import java.util.Calendar;
import java.util.Collections;
+import java.util.GregorianCalendar;
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TimeZone;
/**
* JNI-callable interface onto a wrapped Frontend instance. The main point is
to serialise
@@ -808,6 +812,29 @@ public class JniFrontend {
return secret;
}
+ /**
+ * Performs Hive legacy-compatible timezone conversion
+ */
+ public static byte[] hiveLegacyTimezoneConvert(byte[] timezoneT, long
utc_time_millis)
+ throws ImpalaException {
+ final TStringLiteral timezone = new TStringLiteral();
+ JniUtil.deserializeThrift(protocolFactory_, timezone, timezoneT);
+ // TimeZone.getTimeZone defaults to GMT if it doesn't recognize the
timezone.
+ Calendar c = new
GregorianCalendar(TimeZone.getTimeZone(timezone.getValue()));
+ c.setTimeInMillis(utc_time_millis);
+
+ final TCivilTime civilTime = new TCivilTime(
+ // Normalize month so January starts at 1.
+ c.get(Calendar.YEAR), c.get(Calendar.MONTH)+1,
c.get(Calendar.DAY_OF_MONTH),
+ c.get(Calendar.HOUR_OF_DAY), c.get(Calendar.MINUTE),
c.get(Calendar.SECOND));
+ try {
+ TSerializer serializer = new TSerializer(protocolFactory_);
+ return serializer.serialize(civilTime);
+ } catch (TException e) {
+ throw new InternalException(e.getMessage());
+ }
+ }
+
public String validateSaml2Bearer(byte[] serializedRequest) throws
ImpalaException{
Preconditions.checkNotNull(frontend_);
Preconditions.checkNotNull(frontend_.getSaml2Client());
diff --git a/testdata/data/employee_hive_3_1_3_us_pacific.parquet
b/testdata/data/employee_hive_3_1_3_us_pacific.parquet
new file mode 100644
index 000000000..2125bc688
Binary files /dev/null and
b/testdata/data/employee_hive_3_1_3_us_pacific.parquet differ
diff --git a/testdata/data/hive_kuala_lumpur_legacy.parquet
b/testdata/data/hive_kuala_lumpur_legacy.parquet
new file mode 100644
index 000000000..f03efd4ff
Binary files /dev/null and b/testdata/data/hive_kuala_lumpur_legacy.parquet
differ
diff --git a/testdata/data/tbl_parq1/000000_0 b/testdata/data/tbl_parq1/000000_0
new file mode 100644
index 000000000..17a910f52
Binary files /dev/null and b/testdata/data/tbl_parq1/000000_0 differ
diff --git a/testdata/data/tbl_parq1/000000_1 b/testdata/data/tbl_parq1/000000_1
new file mode 100644
index 000000000..cc0c27e6a
Binary files /dev/null and b/testdata/data/tbl_parq1/000000_1 differ
diff --git a/testdata/data/tbl_parq1/000000_2 b/testdata/data/tbl_parq1/000000_2
new file mode 100644
index 000000000..ba04470e2
Binary files /dev/null and b/testdata/data/tbl_parq1/000000_2 differ
diff --git
a/testdata/workloads/functional-query/queries/QueryTest/timestamp-conversion-hive-313.test
b/testdata/workloads/functional-query/queries/QueryTest/timestamp-conversion-hive-313.test
new file mode 100644
index 000000000..b92aa96fc
--- /dev/null
+++
b/testdata/workloads/functional-query/queries/QueryTest/timestamp-conversion-hive-313.test
@@ -0,0 +1,46 @@
+====
+---- QUERY
+SET timezone=PST;
+SELECT * FROM employee_hive_3_1_3_us_pacific;
+---- TYPES
+INT,TIMESTAMP
+---- RESULTS
+1,1880-01-01 07:52:58
+2,1884-01-01 08:00:00
+3,1990-01-01 08:00:00
+====
+---- QUERY
+SET timezone=PST;
+SET convert_legacy_hive_parquet_utc_timestamps=true;
+SELECT * FROM employee_hive_3_1_3_us_pacific;
+---- TYPES
+INT,TIMESTAMP
+---- RESULTS
+1,1880-01-01 00:00:00
+2,1884-01-01 00:00:00
+3,1990-01-01 00:00:00
+====
+---- QUERY
+SET timezone=PST;
+SET convert_legacy_hive_parquet_utc_timestamps=true;
+SET use_legacy_hive_timestamp_conversion=true;
+SELECT * FROM employee_hive_3_1_3_us_pacific;
+---- TYPES
+INT,TIMESTAMP
+---- RESULTS
+1,1880-01-01 00:00:00
+2,1884-01-01 00:00:00
+3,1990-01-01 00:00:00
+====
+---- QUERY
+SET timezone=UTC;
+SET convert_legacy_hive_parquet_utc_timestamps=true;
+SET use_legacy_hive_timestamp_conversion=true;
+SELECT * FROM employee_hive_3_1_3_us_pacific;
+---- TYPES
+INT,TIMESTAMP
+---- RESULTS
+1,1880-01-01 07:52:58
+2,1884-01-01 08:00:00
+3,1990-01-01 08:00:00
+====
diff --git
a/testdata/workloads/functional-query/queries/QueryTest/timestamp-conversion-hive-3m.test
b/testdata/workloads/functional-query/queries/QueryTest/timestamp-conversion-hive-3m.test
new file mode 100644
index 000000000..84e61bff3
--- /dev/null
+++
b/testdata/workloads/functional-query/queries/QueryTest/timestamp-conversion-hive-3m.test
@@ -0,0 +1,34 @@
+====
+---- QUERY
+SET timezone="Asia/Singapore";
+SELECT * FROM t;
+---- RESULTS
+1899-12-31 16:00:00
+1899-12-31 16:00:00
+1899-12-31 17:04:35
+2020-04-08 05:17:05.215000000
+2020-04-08 05:17:05.215000000
+====
+---- QUERY
+SET timezone="Asia/Singapore";
+SET convert_legacy_hive_parquet_utc_timestamps=true;
+SELECT * FROM t;
+---- RESULTS
+1899-12-31 22:55:25
+1899-12-31 22:55:25
+1900-01-01 00:00:00
+2020-04-08 13:17:05.215000000
+2020-04-08 13:17:05.215000000
+====
+---- QUERY
+SET timezone="Asia/Singapore";
+SET convert_legacy_hive_parquet_utc_timestamps=true;
+SET use_legacy_hive_timestamp_conversion=true;
+SELECT * FROM t;
+---- RESULTS
+1899-12-31 22:55:25
+1900-01-01 00:00:00
+1900-01-01 00:00:00
+2020-04-08 13:17:05.215000000
+2020-04-08 13:17:05.215000000
+====
diff --git
a/testdata/workloads/functional-query/queries/QueryTest/timestamp-conversion-hive-4.test
b/testdata/workloads/functional-query/queries/QueryTest/timestamp-conversion-hive-4.test
new file mode 100644
index 000000000..f97a805c3
--- /dev/null
+++
b/testdata/workloads/functional-query/queries/QueryTest/timestamp-conversion-hive-4.test
@@ -0,0 +1,28 @@
+====
+---- QUERY
+SET timezone="Asia/Kuala_Lumpur";
+SELECT * FROM hive_kuala_lumpur_legacy;
+---- RESULTS
+1899-12-31 16:00:00
+1909-12-31 17:00:00
+1934-12-31 16:40:00
+1939-12-31 16:40:00
+1941-12-31 16:30:00
+1943-12-31 15:00:00
+1969-01-28 16:30:00
+1999-12-31 16:00:00
+====
+---- QUERY
+SET timezone="Asia/Kuala_Lumpur";
+SET convert_legacy_hive_parquet_utc_timestamps=true;
+SELECT * FROM hive_kuala_lumpur_legacy;
+---- RESULTS
+1900-01-01 00:00:00
+1910-01-01 00:00:00
+1935-01-01 00:00:00
+1940-01-01 00:00:00
+1942-01-01 00:00:00
+1944-01-01 00:00:00
+1969-01-29 00:00:00
+2000-01-01 00:00:00
+====
diff --git a/tests/query_test/test_hive_timestamp_conversion.py
b/tests/query_test/test_hive_timestamp_conversion.py
new file mode 100644
index 000000000..2d594f144
--- /dev/null
+++ b/tests/query_test/test_hive_timestamp_conversion.py
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import, division, print_function
+
+from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.file_utils import create_table_and_copy_files,
create_table_from_parquet
+
+
+class TestHiveParquetTimestampConversion(ImpalaTestSuite):
+ """Tests that Impala can read parquet files written by older versions of
Hive or with
+ Hive legacy conversion enabled. Tests use
convert_legacy_hive_parquet_utc_timestamps,
+ use_legacy_hive_timestamp_conversion, and timezone to test conversion."""
+
+ @classmethod
+ def get_workload(self):
+ return 'functional-query'
+
+ @classmethod
+ def add_test_dimensions(cls):
+ super(TestHiveParquetTimestampConversion, cls).add_test_dimensions()
+ cls.ImpalaTestMatrix.add_constraint(lambda v:
+ v.get_value('table_format').file_format == 'parquet'
+ and v.get_value('table_format').compression_codec == 'none')
+
+ def test_hive_4_legacy(self, vector, unique_database):
+ """Test that legacy conversion uses the same timezone conversion as Hive
when
+ Parquet metadata contains writer.zone.conversion.legacy=true.
+
+ Load test data generated via Hive with TZ=Asia/Kuala_Lumpur:
+
+ create table t (d timestamp) stored as parquet;
+ set hive.parquet.timestamp.write.legacy.conversion.enabled=true;
+ insert into t values ("1900-01-01 00:00:00"), ("1910-01-01 00:00:00"),
+ ("1935-01-01 00:00:00"), ("1940-01-01 00:00:00"), ("1942-01-01
00:00:00"),
+ ("1944-01-01 00:00:00"), ("1969-01-29 00:00:00"), ("2000-01-01
00:00:00");
+ """
+ create_table_from_parquet(self.client, unique_database,
"hive_kuala_lumpur_legacy")
+ self.run_test_case("QueryTest/timestamp-conversion-hive-4", vector,
unique_database)
+
+ def test_hive_313(self, vector, unique_database):
+ """The parquet file was written with Hive 3.1.3 using the new Date/Time
APIs
+ (legacy=false) to convert from US/Pacific to UTC. The presence of
writer.time.zone in
+ the metadata of the file allow us to infer that new Date/Time APIS should
be used for
+ the conversion. The use_legacy_hive_timestamp_conversion property
shouldn't be taken
+ into account in this case.
+
+ Test file from
https://github.com/apache/hive/blob/rel/release-4.0.1/data/files/
+ employee_hive_3_1_3_us_pacific.parquet"""
+ create_table_from_parquet(
+ self.client, unique_database, "employee_hive_3_1_3_us_pacific")
+ self.run_test_case("QueryTest/timestamp-conversion-hive-313", vector,
unique_database)
+
+ def test_hive_3_mixed(self, vector, unique_database):
+ """Test table containing Hive legacy timestamps written with Hive prior to
3.1.3.
+
+ Test files target timezone=Asia/Singapore, sourced from
+
https://github.com/apache/hive/tree/rel/release-4.0.1/data/files/tbl_parq1."""
+ create_stmt = "create table %s.t (d timestamp) stored as parquet" %
unique_database
+ create_table_and_copy_files(self.client, create_stmt, unique_database, "t",
+ ["testdata/data/tbl_parq1/" + f for f in ["000000_0", "000000_1",
"000000_2"]])
+ self.run_test_case("QueryTest/timestamp-conversion-hive-3m", vector,
unique_database)