This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new f55077007 IMPALA-12426: Switches the duration fields to be stored in
decimal seconds.
f55077007 is described below
commit f55077007bf68e6cbeaa15cf270c333af847a1f1
Author: jasonmfehr <[email protected]>
AuthorDate: Mon Mar 25 17:03:33 2024 -0700
IMPALA-12426: Switches the duration fields to be stored in decimal seconds.
The original implementation of the completed queries table
stored durations in integer nanoseconds. This change
modifies the duration fields to be stored as seconds with
up to three digits of millisecond precision.
Also reduces the default max number of queued queries to a
number that will not consume as much memory.
Existing sys.impala_query_log tables will need to be
dropped.
Testing was accomplished by modifying the python custom
cluster tests.
Change-Id: I842951a132b7b8eadccb09a3674f4c34ac42ff1b
Reviewed-on: http://gerrit.cloudera.org:8080/21203
Reviewed-by: Michael Smith <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/exec/system-table-scanner.cc | 65 ++++++++++-----
be/src/service/workload-management-fields.cc | 93 ++++++++++++++--------
be/src/service/workload-management-flags.cc | 2 +-
be/src/service/workload-management.cc | 8 +-
be/src/service/workload-management.h | 8 +-
common/thrift/SystemTables.thrift | 8 +-
.../org/apache/impala/catalog/SystemTable.java | 31 +++++---
tests/custom_cluster/test_query_live.py | 6 +-
tests/util/assert_time.py | 33 ++++----
tests/util/workload_management.py | 69 ++++++++--------
10 files changed, 201 insertions(+), 122 deletions(-)
diff --git a/be/src/exec/system-table-scanner.cc
b/be/src/exec/system-table-scanner.cc
index fa91fb165..87030ebdc 100644
--- a/be/src/exec/system-table-scanner.cc
+++ b/be/src/exec/system-table-scanner.cc
@@ -22,6 +22,8 @@
#include <gutil/strings/substitute.h>
#include "gen-cpp/SystemTables_types.h"
+#include "runtime/decimal-value.h"
+#include "runtime/decimal-value.inline.h"
#include "runtime/exec-env.h"
#include "runtime/mem-pool.h"
#include "runtime/mem-tracker.h"
@@ -45,6 +47,10 @@ namespace impala {
static const string ERROR_MEM_LIMIT_EXCEEDED =
"SystemTableScanNode::$0() failed to allocate $1 bytes.";
+// Constant declaring how to convert from micro and nano seconds to
milliseconds.
+static constexpr double MICROS_TO_MILLIS = 1000;
+static constexpr double NANOS_TO_MILLIS = 1000000;
+
Status SystemTableScanner::CreateScanner(RuntimeState* state, RuntimeProfile*
profile,
TSystemTableName::type table_name, std::unique_ptr<SystemTableScanner>*
scanner) {
switch (table_name) {
@@ -89,6 +95,23 @@ static void WriteIntSlot(int32_t value, void* slot) {
*reinterpret_cast<int32_t*>(slot) = value;
}
+static void WriteDecimalSlot(
+ const ColumnType& type, double value, void* slot) {
+ bool overflow = false;
+ switch (type.GetByteSize()) {
+ case 4:
+ *reinterpret_cast<Decimal4Value*>(slot) =
+ Decimal4Value::FromDouble(type, value, false, &overflow);
+ case 8:
+ *reinterpret_cast<Decimal8Value*>(slot) =
+ Decimal8Value::FromDouble(type, value, false, &overflow);
+ case 16:
+ *reinterpret_cast<Decimal16Value*>(slot) =
+ Decimal16Value::FromDouble(type, value, false, &overflow);
+ }
+ DCHECK(!overflow);
+}
+
QueryScanner::QueryScanner(RuntimeState* state, RuntimeProfile* profile)
: SystemTableScanner(state, profile),
active_query_collection_timer_(ADD_TIMER(profile_,
"ActiveQueryCollectionTime")),
@@ -138,10 +161,11 @@ Status QueryScanner::Open() {
return Status::OK();
}
-static void WriteEvent(const QueryStateExpanded& query, void* slot, QueryEvent
name) {
+static void WriteEvent(const QueryStateExpanded& query, const SlotDescriptor*
slot_desc,
+ void* slot, QueryEvent name) {
const auto& event = query.events.find(name);
DCHECK(event != query.events.end());
- WriteBigIntSlot(event->second, slot);
+ WriteDecimalSlot(slot_desc->type(), event->second / NANOS_TO_MILLIS, slot);
}
Status QueryScanner::MaterializeNextTuple(
@@ -205,11 +229,11 @@ Status QueryScanner::MaterializeNextTuple(
case TQueryTableColumn::START_TIME_UTC:
WriteUnixTimestampSlot(record.start_time_us, slot);
break;
- case TQueryTableColumn::TOTAL_TIME_NS: {
+ case TQueryTableColumn::TOTAL_TIME_MS: {
const int64_t end_time_us =
record.end_time_us > 0 ? record.end_time_us : UnixMicros();
- const int64_t duration_us = end_time_us - record.start_time_us;
- WriteBigIntSlot(duration_us * 1000, slot);
+ double duration_us = (end_time_us - record.start_time_us) /
MICROS_TO_MILLIS;
+ WriteDecimalSlot(slot_desc->type(), duration_us, slot);
break;
}
case TQueryTableColumn::QUERY_OPTS_CONFIG:
@@ -262,41 +286,44 @@ Status QueryScanner::MaterializeNextTuple(
case TQueryTableColumn::ROW_MATERIALIZATION_ROWS_PER_SEC:
WriteBigIntSlot(query.row_materialization_rate, slot);
break;
- case TQueryTableColumn::ROW_MATERIALIZATION_TIME_NS:
- WriteBigIntSlot(query.row_materialization_time, slot);
+ case TQueryTableColumn::ROW_MATERIALIZATION_TIME_MS:
+ WriteDecimalSlot(slot_desc->type(),
+ query.row_materialization_time / NANOS_TO_MILLIS, slot);
break;
case TQueryTableColumn::COMPRESSED_BYTES_SPILLED:
WriteBigIntSlot(query.compressed_bytes_spilled, slot);
break;
case TQueryTableColumn::EVENT_PLANNING_FINISHED:
- WriteEvent(query, slot, QueryEvent::PLANNING_FINISHED);
+ WriteEvent(query, slot_desc, slot, QueryEvent::PLANNING_FINISHED);
break;
case TQueryTableColumn::EVENT_SUBMIT_FOR_ADMISSION:
- WriteEvent(query, slot, QueryEvent::SUBMIT_FOR_ADMISSION);
+ WriteEvent(query, slot_desc, slot, QueryEvent::SUBMIT_FOR_ADMISSION);
break;
case TQueryTableColumn::EVENT_COMPLETED_ADMISSION:
- WriteEvent(query, slot, QueryEvent::COMPLETED_ADMISSION);
+ WriteEvent(query, slot_desc, slot, QueryEvent::COMPLETED_ADMISSION);
break;
case TQueryTableColumn::EVENT_ALL_BACKENDS_STARTED:
- WriteEvent(query, slot, QueryEvent::ALL_BACKENDS_STARTED);
+ WriteEvent(query, slot_desc, slot, QueryEvent::ALL_BACKENDS_STARTED);
break;
case TQueryTableColumn::EVENT_ROWS_AVAILABLE:
- WriteEvent(query, slot, QueryEvent::ROWS_AVAILABLE);
+ WriteEvent(query, slot_desc, slot, QueryEvent::ROWS_AVAILABLE);
break;
case TQueryTableColumn::EVENT_FIRST_ROW_FETCHED:
- WriteEvent(query, slot, QueryEvent::FIRST_ROW_FETCHED);
+ WriteEvent(query, slot_desc, slot, QueryEvent::FIRST_ROW_FETCHED);
break;
case TQueryTableColumn::EVENT_LAST_ROW_FETCHED:
- WriteEvent(query, slot, QueryEvent::LAST_ROW_FETCHED);
+ WriteEvent(query, slot_desc, slot, QueryEvent::LAST_ROW_FETCHED);
break;
case TQueryTableColumn::EVENT_UNREGISTER_QUERY:
- WriteEvent(query, slot, QueryEvent::UNREGISTER_QUERY);
+ WriteEvent(query, slot_desc, slot, QueryEvent::UNREGISTER_QUERY);
break;
- case TQueryTableColumn::READ_IO_WAIT_TOTAL_NS:
- WriteBigIntSlot(query.read_io_wait_time_total, slot);
+ case TQueryTableColumn::READ_IO_WAIT_TOTAL_MS:
+ WriteDecimalSlot(slot_desc->type(),
+ query.read_io_wait_time_total / NANOS_TO_MILLIS, slot);
break;
- case TQueryTableColumn::READ_IO_WAIT_MEAN_NS:
- WriteBigIntSlot(query.read_io_wait_time_mean, slot);
+ case TQueryTableColumn::READ_IO_WAIT_MEAN_MS:
+ WriteDecimalSlot(slot_desc->type(),
+ query.read_io_wait_time_mean / NANOS_TO_MILLIS, slot);
break;
case TQueryTableColumn::BYTES_READ_CACHE_TOTAL:
WriteBigIntSlot(query.bytes_read_cache_total, slot);
diff --git a/be/src/service/workload-management-fields.cc
b/be/src/service/workload-management-fields.cc
index a95bb671c..072242b7c 100644
--- a/be/src/service/workload-management-fields.cc
+++ b/be/src/service/workload-management-fields.cc
@@ -30,6 +30,7 @@
#include <boost/algorithm/string.hpp>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
+#include <gutil/strings/substitute.h>
#include "common/compiler-util.h"
#include "gen-cpp/Types_types.h"
@@ -43,12 +44,38 @@
DECLARE_int32(query_log_max_sql_length);
DECLARE_int32(query_log_max_plan_length);
+using namespace std;
+using strings::Substitute;
+
namespace impala {
namespace workload_management {
/// Helper type for event timeline timestamp functions.
-using _event_compare_pred = std::function<bool(const std::string& comp)>;
+using _event_compare_pred = function<bool(const string& comp)>;
+
+// Constant declaring how to convert from micro and nano seconds to
milliseconds.
+static constexpr double MICROS_TO_MILLIS = 1000;
+static constexpr double NANOS_TO_MILLIS = 1000000;
+
+// Constants declaring how durations measured in milliseconds will be stored
in the table.
+// Must match the constants with the same name declared in SystemTable.java.
+static constexpr int8_t DURATION_DECIMAL_PRECISION = 18;
+static constexpr int8_t DURATION_DECIMAL_SCALE = 3;
+
+// SQL column type for duration columns.
+static const string MILLIS_DECIMAL_TYPE = Substitute("DECIMAL($0,$1)",
+ DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE);
+
+/// Helper function to write a decimal value to the completed queries sql
stream.
+///
+/// Parameters:
+/// `ctx` The field parse context object.
+/// `val` A value to write in the completed queries sql stream.
+/// `factor` The provided val input will be divided by this value.
+static void _write_decimal(FieldParserContext& ctx, int64_t val, double
factor) {
+ ctx.sql << "CAST(" << val / factor << " AS " << MILLIS_DECIMAL_TYPE << ")";
+}
/// Helper function to write the timestamp for a single event from the events
timeline
/// into the completed queries sql statement stream.
@@ -61,11 +88,11 @@ using _event_compare_pred = std::function<bool(const
std::string& comp)>;
static void _write_event(FieldParserContext& ctx, QueryEvent target_event) {
const auto& event = ctx.record->events.find(target_event);
DCHECK(event != ctx.record->events.end());
- ctx.sql << event->second;
+ _write_decimal(ctx, event->second, NANOS_TO_MILLIS);
}
/// List of query table columns. Must be kept in-sync with SystemTables.thrift
-const std::list<FieldDefinition> FIELD_DEFINITIONS = {
+const list<FieldDefinition> FIELD_DEFINITIONS = {
// Cluster Id
// Required
FieldDefinition("cluster_id", TPrimitiveType::STRING,
@@ -171,16 +198,16 @@ const std::list<FieldDefinition> FIELD_DEFINITIONS = {
}),
// Query Duration
- FieldDefinition("total_time_ns", TPrimitiveType::BIGINT,
+ FieldDefinition("total_time_ms", TPrimitiveType::DECIMAL,
[](FieldParserContext& ctx){
- ctx.sql << (ctx.record->base_state->end_time_us -
- ctx.record->base_state->start_time_us) * 1000;
- }),
+ _write_decimal(ctx, (ctx.record->base_state->end_time_us -
+ ctx.record->base_state->start_time_us), MICROS_TO_MILLIS);
+ }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Query Options set by Configuration
FieldDefinition("query_opts_config", TPrimitiveType::STRING,
[](FieldParserContext& ctx){
- const std::string opts_str =
DebugQueryOptions(ctx.record->query_options);
+ const string opts_str = DebugQueryOptions(ctx.record->query_options);
ctx.sql << "'" << EscapeSql(opts_str) << "'";
}),
@@ -271,10 +298,10 @@ const std::list<FieldDefinition> FIELD_DEFINITIONS = {
}),
// Row Materialization Time
- FieldDefinition("row_materialization_time_ns", TPrimitiveType::BIGINT,
+ FieldDefinition("row_materialization_time_ms", TPrimitiveType::DECIMAL,
[](FieldParserContext& ctx){
- ctx.sql << ctx.record->row_materialization_time;
- }),
+ _write_decimal(ctx, ctx.record->row_materialization_time,
NANOS_TO_MILLIS);
+ }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Compressed Bytes Spilled to Disk
FieldDefinition("compressed_bytes_spilled", TPrimitiveType::BIGINT,
@@ -283,64 +310,64 @@ const std::list<FieldDefinition> FIELD_DEFINITIONS = {
}),
// Events Timeline Planning Finished
- FieldDefinition("event_planning_finished", TPrimitiveType::BIGINT,
+ FieldDefinition("event_planning_finished", TPrimitiveType::DECIMAL,
[](FieldParserContext& ctx){
_write_event(ctx, PLANNING_FINISHED);
- }),
+ }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Events Timeline Submit for Admission
- FieldDefinition("event_submit_for_admission", TPrimitiveType::BIGINT,
+ FieldDefinition("event_submit_for_admission", TPrimitiveType::DECIMAL,
[](FieldParserContext& ctx){
_write_event(ctx, SUBMIT_FOR_ADMISSION);
- }),
+ }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Events Timeline Completed Admission
- FieldDefinition("event_completed_admission", TPrimitiveType::BIGINT,
+ FieldDefinition("event_completed_admission", TPrimitiveType::DECIMAL,
[](FieldParserContext& ctx){
_write_event(ctx, COMPLETED_ADMISSION);
- }),
+ }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Events Timeline All Execution Backends Started
- FieldDefinition("event_all_backends_started", TPrimitiveType::BIGINT,
+ FieldDefinition("event_all_backends_started", TPrimitiveType::DECIMAL,
[](FieldParserContext& ctx){
_write_event(ctx, ALL_BACKENDS_STARTED);
- }),
+ }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Events Timeline Rows Available
- FieldDefinition("event_rows_available", TPrimitiveType::BIGINT,
+ FieldDefinition("event_rows_available", TPrimitiveType::DECIMAL,
[](FieldParserContext& ctx){
_write_event(ctx, ROWS_AVAILABLE);
- }),
+ }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Events Timeline First Row Fetched
- FieldDefinition("event_first_row_fetched", TPrimitiveType::BIGINT,
+ FieldDefinition("event_first_row_fetched", TPrimitiveType::DECIMAL,
[](FieldParserContext& ctx){
_write_event(ctx, FIRST_ROW_FETCHED);
- }),
+ }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Events Timeline Last Row Fetched
- FieldDefinition("event_last_row_fetched", TPrimitiveType::BIGINT,
+ FieldDefinition("event_last_row_fetched", TPrimitiveType::DECIMAL,
[](FieldParserContext& ctx){
_write_event(ctx, LAST_ROW_FETCHED);
- }),
+ }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Events Timeline Unregister Query
- FieldDefinition("event_unregister_query", TPrimitiveType::BIGINT,
+ FieldDefinition("event_unregister_query", TPrimitiveType::DECIMAL,
[](FieldParserContext& ctx){
_write_event(ctx, UNREGISTER_QUERY);
- }),
+ }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Read IO Wait Time Total
- FieldDefinition("read_io_wait_total_ns", TPrimitiveType::BIGINT,
+ FieldDefinition("read_io_wait_total_ms", TPrimitiveType::DECIMAL,
[](FieldParserContext& ctx){
- ctx.sql << ctx.record->read_io_wait_time_total;
- }),
+ _write_decimal(ctx, ctx.record->read_io_wait_time_total,
NANOS_TO_MILLIS);
+ }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Read IO Wait Time Mean
- FieldDefinition("read_io_wait_mean_ns", TPrimitiveType::BIGINT,
+ FieldDefinition("read_io_wait_mean_ms", TPrimitiveType::DECIMAL,
[](FieldParserContext& ctx){
- ctx.sql << ctx.record->read_io_wait_time_mean;
- }),
+ _write_decimal(ctx, ctx.record->read_io_wait_time_mean,
NANOS_TO_MILLIS);
+ }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Bytes Read from the Data Cache Total
FieldDefinition("bytes_read_cache_total", TPrimitiveType::BIGINT,
diff --git a/be/src/service/workload-management-flags.cc
b/be/src/service/workload-management-flags.cc
index 7bd6249ba..81ec53bf1 100644
--- a/be/src/service/workload-management-flags.cc
+++ b/be/src/service/workload-management-flags.cc
@@ -79,7 +79,7 @@ DEFINE_int32_hidden(query_log_write_timeout_s, 0, "Specifies
the query timeout i
"seconds for inserts to the query log table. A value less than 1 indicates
to use "
"the same value as the query_log_write_interval_s flag.");
-DEFINE_int32(query_log_max_queued, 100000, "Maximum number of records that can
be queued "
+DEFINE_int32(query_log_max_queued, 10000, "Maximum number of records that can
be queued "
"before they are written to the impala query log table. This flag operates
"
"independently of the 'query_log_write_interval_s' flag. If the number of
queued "
"records reaches this value, the records will be written to the query log
table no "
diff --git a/be/src/service/workload-management.cc
b/be/src/service/workload-management.cc
index 611395071..ec8023a81 100644
--- a/be/src/service/workload-management.cc
+++ b/be/src/service/workload-management.cc
@@ -106,7 +106,13 @@ static const Status SetupDbTable(InternalServer* server,
const string& table_nam
create_table_sql << "CREATE TABLE IF NOT EXISTS " << table_name << "(";
for (const auto& field : FIELD_DEFINITIONS) {
- create_table_sql << field.db_column_name << " " << field.db_column_type <<
",";
+ create_table_sql << field.db_column_name << " " << field.db_column_type;
+
+ if (field.db_column_type == TPrimitiveType::DECIMAL) {
+ create_table_sql << "(" << field.precision << "," << field.scale << ")";
+ }
+
+ create_table_sql << ",";
}
create_table_sql.move_back();
diff --git a/be/src/service/workload-management.h
b/be/src/service/workload-management.h
index 8305e1e9b..a5c865f12 100644
--- a/be/src/service/workload-management.h
+++ b/be/src/service/workload-management.h
@@ -53,10 +53,14 @@ struct FieldDefinition {
const std::string db_column_name;
const TPrimitiveType::type db_column_type;
const FieldParser parser;
+ const int16_t precision;
+ const int16_t scale;
FieldDefinition(const std::string db_col_name, const TPrimitiveType::type
db_col_type,
- const FieldParser fp) : db_column_name(std::move(db_col_name)),
- db_column_type(std::move(db_col_type)), parser(std::move(fp)) {}
+ const FieldParser fp, const int16_t precision = 0,
+ const int16_t scale = 0) : db_column_name(std::move(db_col_name)),
+ db_column_type(std::move(db_col_type)), parser(std::move(fp)),
+ precision(precision), scale(scale) {}
}; // struct FieldDefinition
/// This list is the main data structure for workload management. Each list
entry
diff --git a/common/thrift/SystemTables.thrift
b/common/thrift/SystemTables.thrift
index 3ca81db47..27563d7bb 100644
--- a/common/thrift/SystemTables.thrift
+++ b/common/thrift/SystemTables.thrift
@@ -35,7 +35,7 @@ enum TQueryTableColumn {
QUERY_TYPE
NETWORK_ADDRESS
START_TIME_UTC
- TOTAL_TIME_NS
+ TOTAL_TIME_MS
QUERY_OPTS_CONFIG
RESOURCE_POOL
PER_HOST_MEM_ESTIMATE
@@ -49,7 +49,7 @@ enum TQueryTableColumn {
EXEC_SUMMARY
NUM_ROWS_FETCHED
ROW_MATERIALIZATION_ROWS_PER_SEC
- ROW_MATERIALIZATION_TIME_NS
+ ROW_MATERIALIZATION_TIME_MS
COMPRESSED_BYTES_SPILLED
EVENT_PLANNING_FINISHED
EVENT_SUBMIT_FOR_ADMISSION
@@ -59,8 +59,8 @@ enum TQueryTableColumn {
EVENT_FIRST_ROW_FETCHED
EVENT_LAST_ROW_FETCHED
EVENT_UNREGISTER_QUERY
- READ_IO_WAIT_TOTAL_NS
- READ_IO_WAIT_MEAN_NS
+ READ_IO_WAIT_TOTAL_MS
+ READ_IO_WAIT_MEAN_MS
BYTES_READ_CACHE_TOTAL
BYTES_READ_TOTAL
PERNODE_PEAK_MEM_MIN
diff --git a/fe/src/main/java/org/apache/impala/catalog/SystemTable.java
b/fe/src/main/java/org/apache/impala/catalog/SystemTable.java
index 3f5f59da4..62b8d16ac 100644
--- a/fe/src/main/java/org/apache/impala/catalog/SystemTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/SystemTable.java
@@ -55,6 +55,11 @@ public final class SystemTable extends Table {
private static final Map<String, TSystemTableName> SYSTEM_TABLE_NAME_MAP =
ImmutableMap.of(QUERY_LIVE, TSystemTableName.QUERY_LIVE);
+ // Constants declaring how durations measured in milliseconds will be stored
in the db.
+ // Must match constants with the same name declared in
workload-management-fields.cc.
+ private static final int DURATION_DECIMAL_PRECISION = 18;
+ private static final int DURATION_DECIMAL_SCALE = 3;
+
private SystemTable(org.apache.hadoop.hive.metastore.api.Table msTable, Db
db,
String name, String owner) {
super(msTable, db, name, owner);
@@ -65,24 +70,12 @@ public final class SystemTable extends Table {
switch (column) {
case START_TIME_UTC:
return Type.TIMESTAMP;
- case TOTAL_TIME_NS:
case PER_HOST_MEM_ESTIMATE:
case DEDICATED_COORD_MEM_ESTIMATE:
case CLUSTER_MEMORY_ADMITTED:
case NUM_ROWS_FETCHED:
case ROW_MATERIALIZATION_ROWS_PER_SEC:
- case ROW_MATERIALIZATION_TIME_NS:
case COMPRESSED_BYTES_SPILLED:
- case EVENT_PLANNING_FINISHED:
- case EVENT_SUBMIT_FOR_ADMISSION:
- case EVENT_COMPLETED_ADMISSION:
- case EVENT_ALL_BACKENDS_STARTED:
- case EVENT_ROWS_AVAILABLE:
- case EVENT_FIRST_ROW_FETCHED:
- case EVENT_LAST_ROW_FETCHED:
- case EVENT_UNREGISTER_QUERY:
- case READ_IO_WAIT_TOTAL_NS:
- case READ_IO_WAIT_MEAN_NS:
case BYTES_READ_CACHE_TOTAL:
case BYTES_READ_TOTAL:
case PERNODE_PEAK_MEM_MIN:
@@ -91,6 +84,20 @@ public final class SystemTable extends Table {
return Type.BIGINT;
case BACKENDS_COUNT:
return Type.INT;
+ case TOTAL_TIME_MS:
+ case ROW_MATERIALIZATION_TIME_MS:
+ case EVENT_PLANNING_FINISHED:
+ case EVENT_SUBMIT_FOR_ADMISSION:
+ case EVENT_COMPLETED_ADMISSION:
+ case EVENT_ALL_BACKENDS_STARTED:
+ case EVENT_ROWS_AVAILABLE:
+ case EVENT_FIRST_ROW_FETCHED:
+ case EVENT_LAST_ROW_FETCHED:
+ case EVENT_UNREGISTER_QUERY:
+ case READ_IO_WAIT_TOTAL_MS:
+ case READ_IO_WAIT_MEAN_MS:
+ return ScalarType.createDecimalType(
+ DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE);
default:
return Type.STRING;
}
diff --git a/tests/custom_cluster/test_query_live.py
b/tests/custom_cluster/test_query_live.py
index ca44122e0..93bdfaadb 100644
--- a/tests/custom_cluster/test_query_live.py
+++ b/tests/custom_cluster/test_query_live.py
@@ -82,7 +82,7 @@ class TestQueryLive(CustomClusterTestSuite):
# query filtering
result3 = self.execute_query(
"select query_id from sys.impala_query_live "
- "where total_time_ns > 0.0 order by start_time_utc")
+ "where total_time_ms > 0.0 order by start_time_utc")
assert len(result3.data) == 4
assert result1.query_id == result3.data[0]
assert result2.query_id == result3.data[2]
@@ -184,7 +184,7 @@ class TestQueryLive(CustomClusterTestSuite):
assert result.data[0] == result2.data[0]
def remove_dynamic_fields(fields):
- # Excludes QUERY_STATE, IMPALA_QUERY_END_STATE, QUERY_TYPE,
TOTAL_TIME_NS, and
+ # Excludes QUERY_STATE, IMPALA_QUERY_END_STATE, QUERY_TYPE,
TOTAL_TIME_MS, and
# everything after QUERY_OPTS_CONFIG as they change over the course of
compiling
# and running the query.
return fields[:10] + fields[13:15] + fields[16:17]
@@ -297,7 +297,7 @@ class TestQueryLive(CustomClusterTestSuite):
logged = self.execute_query_expect_success(self.client,
'select count(*) from functional.alltypes')
self.cluster.get_first_impalad().service.wait_for_metric_value(
- "impala-server.completed-queries.written", 1, 30, allow_greater=True)
+ "impala-server.completed-queries.written", 2, 30, allow_greater=True)
query = """select query_id from
(select query_id, start_time_utc from sys.impala_query_live live
diff --git a/tests/util/assert_time.py b/tests/util/assert_time.py
index 7c6bc3f72..ba628f900 100644
--- a/tests/util/assert_time.py
+++ b/tests/util/assert_time.py
@@ -15,24 +15,25 @@
# specific language governing permissions and limitations
# under the License.
-def assert_time_str(expected_str, actual_time_ns, msg, tolerance=0.005):
- """Asserts a pretty printed time string matches a specific number of
nanoseconds."""
+def assert_time_str(expected_str, actual_time_ms, msg, tolerance=0.005):
+ """Asserts a pretty printed time string matches a specific number of
milliseconds."""
- total_nanoseconds = convert_to_nanos(expected_str)
- actual_time_ns = int(actual_time_ns)
+ total_ms = convert_to_milliseconds(expected_str)
+ actual_time_ms = float(actual_time_ms)
- expected_min = total_nanoseconds - (total_nanoseconds * tolerance)
- expected_max = total_nanoseconds + (total_nanoseconds * tolerance)
- assert expected_min <= actual_time_ns <= expected_max, \
+ expected_min = total_ms - (total_ms * tolerance)
+ expected_max = total_ms + (total_ms * tolerance)
+ assert expected_min <= actual_time_ms <= expected_max, \
"{0} -- expected: {1}, actual: {2}, calculated: {3}, tolerance: {4}" \
- .format(msg, expected_str, actual_time_ns, total_nanoseconds, tolerance)
+ .format(msg, expected_str, actual_time_ms, total_ms, tolerance)
-def convert_to_nanos(time_str):
- """Convert a pretty printed time string into integer nanoseconds."""
- units = {'h': 3600 * 1e9, 'm': 60 * 1e9, 's': 1e9, 'ms': 1e6, 'us': 1e3,
'ns': 1}
+def convert_to_milliseconds(time_str):
+ """Convert a pretty printed time string into a float with up to three digits
for the
+ decimal places."""
+ units = {'h': 3600000, 'm': 60000, 's': 1000, 'ms': 1, 'us': 1e-3, 'ns':
1e-6}
- total_nanoseconds = 0
+ total_ms = 0.0
current_number = ''
current_unit = ''
@@ -40,7 +41,7 @@ def convert_to_nanos(time_str):
if char.isdigit() or char == '.':
if current_unit != '':
if current_unit in units:
- total_nanoseconds += int(float(current_number) * units[current_unit])
+ total_ms += float(current_number) * units[current_unit]
current_number = ''
current_unit = ''
else:
@@ -52,6 +53,8 @@ def convert_to_nanos(time_str):
else:
raise ValueError("Invalid character in time string")
- total_nanoseconds += int(float(current_number) * units[current_unit])
+ total_ms += float(current_number) * units[current_unit]
- return total_nanoseconds
+ # The differences between round in Python 2 and Python 3 do not matter here.
+ # pylint: disable=round-builtin
+ return round(total_ms, 3)
diff --git a/tests/util/workload_management.py
b/tests/util/workload_management.py
index 2ee3eba3f..4cdacfe54 100644
--- a/tests/util/workload_management.py
+++ b/tests/util/workload_management.py
@@ -21,13 +21,12 @@ import re
import requests
from datetime import datetime
-from tests.util.assert_time import assert_time_str, convert_to_nanos
+from tests.util.assert_time import assert_time_str, convert_to_milliseconds
from tests.util.memory import assert_byte_str, convert_to_bytes
DEDICATED_COORD_SAFETY_BUFFER_BYTES = 104857600
EXPECTED_QUERY_COLS = 49
-
CLUSTER_ID = "CLUSTER_ID"
QUERY_ID = "QUERY_ID"
SESSION_ID = "SESSION_ID"
@@ -43,7 +42,7 @@ IMPALA_QUERY_END_STATE = "IMPALA_QUERY_END_STATE"
QUERY_TYPE = "QUERY_TYPE"
NETWORK_ADDRESS = "NETWORK_ADDRESS"
START_TIME_UTC = "START_TIME_UTC"
-TOTAL_TIME_NS = "TOTAL_TIME_NS"
+TOTAL_TIME_MS = "TOTAL_TIME_MS"
QUERY_OPTS_CONFIG = "QUERY_OPTS_CONFIG"
RESOURCE_POOL = "RESOURCE_POOL"
PER_HOST_MEM_ESTIMATE = "PER_HOST_MEM_ESTIMATE"
@@ -57,7 +56,7 @@ EXECUTOR_GROUPS = "EXECUTOR_GROUPS"
EXEC_SUMMARY = "EXEC_SUMMARY"
NUM_ROWS_FETCHED = "NUM_ROWS_FETCHED"
ROW_MATERIALIZATION_ROWS_PER_SEC = "ROW_MATERIALIZATION_ROWS_PER_SEC"
-ROW_MATERIALIZATION_TIME_NS = "ROW_MATERIALIZATION_TIME_NS"
+ROW_MATERIALIZATION_TIME_MS = "ROW_MATERIALIZATION_TIME_MS"
COMPRESSED_BYTES_SPILLED = "COMPRESSED_BYTES_SPILLED"
EVENT_PLANNING_FINISHED = "EVENT_PLANNING_FINISHED"
EVENT_SUBMIT_FOR_ADMISSION = "EVENT_SUBMIT_FOR_ADMISSION"
@@ -67,8 +66,8 @@ EVENT_ROWS_AVAILABLE = "EVENT_ROWS_AVAILABLE"
EVENT_FIRST_ROW_FETCHED = "EVENT_FIRST_ROW_FETCHED"
EVENT_LAST_ROW_FETCHED = "EVENT_LAST_ROW_FETCHED"
EVENT_UNREGISTER_QUERY = "EVENT_UNREGISTER_QUERY"
-READ_IO_WAIT_TOTAL_NS = "READ_IO_WAIT_TOTAL_NS"
-READ_IO_WAIT_MEAN_NS = "READ_IO_WAIT_MEAN_NS"
+READ_IO_WAIT_TOTAL_MS = "READ_IO_WAIT_TOTAL_MS"
+READ_IO_WAIT_MEAN_MS = "READ_IO_WAIT_MEAN_MS"
BYTES_READ_CACHE_TOTAL = "BYTES_READ_CACHE_TOTAL"
BYTES_READ_TOTAL = "BYTES_READ_TOTAL"
PERNODE_PEAK_MEM_MIN = "PERNODE_PEAK_MEM_MIN"
@@ -79,6 +78,12 @@ PLAN = "PLAN"
TABLES_QUERIED = "TABLES_QUERIED"
+def round_to_3(val):
+ # The differences between round in Python 2 and Python 3 do not matter here.
+ # pylint: disable=round-builtin
+ return round(val, 3)
+
+
def assert_query(query_tbl, client, expected_cluster_id, raw_profile=None,
impalad=None,
query_id=None, max_mem_for_admission=None, max_row_size=None):
"""Helper function to assert that the values in the completed query log table
@@ -262,12 +267,12 @@ def assert_query(query_tbl, client, expected_cluster_id,
raw_profile=None, impal
# Query Duration (allow values that are within 1 second)
index += 1
- assert sql_results.column_labels[index] == TOTAL_TIME_NS
- ret_data[TOTAL_TIME_NS] = data[index]
+ assert sql_results.column_labels[index] == TOTAL_TIME_MS
+ ret_data[TOTAL_TIME_MS] = data[index]
duration = end_time_obj - start_time_obj
- min_allowed = int(duration.total_seconds() * 1000000000) - 1
- max_allowed = min_allowed + 2
- assert min_allowed <= int(data[index]) <= max_allowed, "total time incorrect"
+ min_allowed = round_to_3(duration.total_seconds() * 1000 * 0.999)
+ max_allowed = round_to_3(duration.total_seconds() * 1000 * 1.001)
+ assert min_allowed <= float(data[index]) <= max_allowed, "total time
incorrect"
# Query Options Set By Configuration
index += 1
@@ -467,8 +472,8 @@ def assert_query(query_tbl, client, expected_cluster_id,
raw_profile=None, impal
# Row Materialization Time
index += 1
- assert sql_results.column_labels[index] == ROW_MATERIALIZATION_TIME_NS
- ret_data[ROW_MATERIALIZATION_TIME_NS] = data[index]
+ assert sql_results.column_labels[index] == ROW_MATERIALIZATION_TIME_MS
+ ret_data[ROW_MATERIALIZATION_TIME_MS] = data[index]
row_mat_tmr = re.search(r'\n\s+\-\s+RowMaterializationTimer:\s+(.*?)\n',
profile_text)
if query_state_value == "EXCEPTION":
assert row_mat_tmr is None
@@ -477,7 +482,7 @@ def assert_query(query_tbl, client, expected_cluster_id,
raw_profile=None, impal
assert row_mat_tmr.group(1) == "0.000ns", "row materialization timer
incorrect"
else:
assert row_mat_tmr is not None
- assert_time_str(row_mat_tmr.group(1), (int(data[index])),
+ assert_time_str(row_mat_tmr.group(1), data[index],
"row materialization time incorrect")
# Compressed Bytes Spilled
@@ -499,7 +504,7 @@ def assert_query(query_tbl, client, expected_cluster_id,
raw_profile=None, impal
assert sql_results.column_labels[index] == EVENT_PLANNING_FINISHED
ret_data[EVENT_PLANNING_FINISHED] = data[index]
if query_state_value == "EXCEPTION":
- assert data[index] == "0", "planning finished event incorrect"
+ assert data[index] == "0.000", "planning finished event incorrect"
else:
event = re.search(r'\n\s+\-\s+Planning finished:\s+(\S+)', timeline)
assert event is not None, "planning finished event missing"
@@ -510,7 +515,7 @@ def assert_query(query_tbl, client, expected_cluster_id,
raw_profile=None, impal
assert sql_results.column_labels[index] == EVENT_SUBMIT_FOR_ADMISSION
ret_data[EVENT_SUBMIT_FOR_ADMISSION] = data[index]
if query_state_value == "EXCEPTION" or query_type == "DDL":
- assert data[index] == "0", "submit for admission event incorrect"
+ assert data[index] == "0.000", "submit for admission event incorrect"
else:
event = re.search(r'\n\s+\-\s+Submit for admission:\s+(\S+)', timeline)
assert event is not None, "submit for admission event missing"
@@ -521,7 +526,7 @@ def assert_query(query_tbl, client, expected_cluster_id,
raw_profile=None, impal
assert sql_results.column_labels[index] == EVENT_COMPLETED_ADMISSION
ret_data[EVENT_COMPLETED_ADMISSION] = data[index]
if query_state_value == "EXCEPTION" or query_type == "DDL":
- assert data[index] == "0", "completed admission event incorrect"
+ assert data[index] == "0.000", "completed admission event incorrect"
else:
event = re.search(r'\n\s+\-\s+Completed admission:\s+(\S+)', timeline)
assert event is not None, "completed admission event missing"
@@ -532,7 +537,7 @@ def assert_query(query_tbl, client, expected_cluster_id,
raw_profile=None, impal
assert sql_results.column_labels[index] == EVENT_ALL_BACKENDS_STARTED
ret_data[EVENT_ALL_BACKENDS_STARTED] = data[index]
if query_state_value == "EXCEPTION" or query_type == "DDL":
- assert data[index] == "0", "all backends started event incorrect"
+ assert data[index] == "0.000", "all backends started event incorrect"
else:
event = re.search(r'\n\s+\-\s+All \d+ execution backends \(\d+ fragment
instances\)'
r' started:\s+(\S+)', timeline)
@@ -544,7 +549,7 @@ def assert_query(query_tbl, client, expected_cluster_id,
raw_profile=None, impal
assert sql_results.column_labels[index] == EVENT_ROWS_AVAILABLE
ret_data[EVENT_ROWS_AVAILABLE] = data[index]
if query_state_value == "EXCEPTION" or query_type == "DML":
- assert data[index] == "0", "rows available event incorrect"
+ assert data[index] == "0.000", "rows available event incorrect"
else:
event = re.search(r'\n\s+\-\s+Rows available:\s+(\S+)', timeline)
assert event is not None, "rows available event missing"
@@ -555,7 +560,7 @@ def assert_query(query_tbl, client, expected_cluster_id,
raw_profile=None, impal
assert sql_results.column_labels[index] == EVENT_FIRST_ROW_FETCHED
ret_data[EVENT_FIRST_ROW_FETCHED] = data[index]
if query_state_value == "EXCEPTION" or query_type == "DDL" or query_type ==
"DML":
- assert data[index] == "0", "first row fetched event incorrect"
+ assert data[index] == "0.000", "first row fetched event incorrect"
else:
event = re.search(r'\n\s+\-\s+First row fetched:\s+(\S+)', timeline)
assert event is not None, "first row fetched event missing"
@@ -566,7 +571,7 @@ def assert_query(query_tbl, client, expected_cluster_id,
raw_profile=None, impal
assert sql_results.column_labels[index] == EVENT_LAST_ROW_FETCHED
ret_data[EVENT_LAST_ROW_FETCHED] = data[index]
if query_state_value == "EXCEPTION" or query_type == "DDL":
- assert data[index] == "0", "last row fetched event incorrect"
+ assert data[index] == "0.000", "last row fetched event incorrect"
else:
event = re.search(r'\n\s+\-\s+Last row fetched:\s+(\S+)', timeline)
assert event is not None, "last row fetched event missing"
@@ -582,33 +587,33 @@ def assert_query(query_tbl, client, expected_cluster_id,
raw_profile=None, impal
# Read IO Wait Total
index += 1
- assert sql_results.column_labels[index] == READ_IO_WAIT_TOTAL_NS
- ret_data[READ_IO_WAIT_TOTAL_NS] = data[index]
+ assert sql_results.column_labels[index] == READ_IO_WAIT_TOTAL_MS
+ ret_data[READ_IO_WAIT_TOTAL_MS] = data[index]
total_read_wait = 0
if (query_state_value != "EXCEPTION" and query_type == "QUERY") or
data[index] != "0":
re_wait_time = re.compile(r'^\s+\-\s+ScannerIoWaitTime:\s+(.*?)$')
read_waits = assert_scan_node_metrics(re_wait_time, profile_lines)
for r in read_waits:
- total_read_wait += int(convert_to_nanos(r))
+ total_read_wait += convert_to_milliseconds(r)
tolerance = total_read_wait * 0.001
- assert total_read_wait - tolerance <= int(data[index]) <= \
+ assert total_read_wait - tolerance <= float(data[index]) <= \
total_read_wait + tolerance, "read io wait time total incorrect"
else:
- assert data[index] == "0"
+ assert data[index] == "0.000"
# Read IO Wait Average
index += 1
- assert sql_results.column_labels[index] == READ_IO_WAIT_MEAN_NS
- ret_data[READ_IO_WAIT_MEAN_NS] = data[index]
+ assert sql_results.column_labels[index] == READ_IO_WAIT_MEAN_MS
+ ret_data[READ_IO_WAIT_MEAN_MS] = data[index]
if (query_state_value != "EXCEPTION" and query_type == "QUERY"
- and len(read_waits) != 0) or data[index] != "0":
- avg_read_wait = int(total_read_wait / len(read_waits))
- assert avg_read_wait - tolerance <= int(data[index]) <= avg_read_wait +
tolerance, \
+ and len(read_waits) != 0) or data[index] != "0.000":
+ avg_read_wait = round_to_3(float(total_read_wait / len(read_waits)))
+ assert avg_read_wait - tolerance <= float(data[index]) <= avg_read_wait +
tolerance, \
"read io wait time average incorrect"
else:
- assert data[index] == "0"
+ assert data[index] == "0.000"
# Total Bytes Read From Cache
index += 1