This is an automated email from the ASF dual-hosted git repository.
michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 97cd30c60 IMPALA-12426: Workload Management Supporting Changes
97cd30c60 is described below
commit 97cd30c607a274d25f5636a21461fc7441d5e63e
Author: jasonmfehr <[email protected]>
AuthorDate: Wed Feb 21 13:41:08 2024 -0800
IMPALA-12426: Workload Management Supporting Changes
Contains several disparate pieces of functionality that support the
overall workload management work.
1. TNetworkAddressComparator
An unused existing comparator for the Thrift class TNetworkAddress
already existed in the thrift-util.h file. This comparator has been
moved to the network-util.h file where it now resides in the same
place as other utility functions that operator on TNetworkAddress
instances.
The existing comparator did not consider the uds address. It only
considered hostname and port. The new comparator considers all three.
Testing is accomplished by porting the existing ctests and adding
additional ctests.
2. StringStreamPop
This new class extends a std::basic_stringstream<char> to add a
function that enables removing a character from the end.
Testing is accomplished using new ctests.
3. Ticker
This new header-only class notifies a condition variable at periodic
intervals. It is a lightweight that sleeps until the configured
duration has passed at which point it wakes up and notifies the
condition variable. It also enables consumers to offload spurious
wakeup guards to this class.
Ctests have been added to test the functionality of this new class.
4. TUniqueId Empty Utility Function
A new function UUIDEmpty returns true if a provided TUniqueID does
not contain a UUID or false otherwise.
Ctests have been added to test this new function.
5. run_clang_tidy.sh
Additional informational outputs have been added to this script to
enable tracking the status of the script and to easily identify
errors found by clang tidy.
6. Summary Util Test
A ctest was developed for testing the text table generation code that
generates the exec summary portion of the query profile. This ctest
was developed as part of an idea that did not ultimately pan out.
Rather than throwing away that test code, it has been added as a new
ctest.
Change-Id: Iee23334ec56a18b192a75d052468bf59159b6424
Reviewed-on: http://gerrit.cloudera.org:8080/21048
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/rpc/thrift-util-test.cc | 14 ----
be/src/rpc/thrift-util.cc | 7 --
be/src/rpc/thrift-util.h | 4 --
be/src/util/CMakeLists.txt | 8 ++-
be/src/util/network-util-test.cc | 100 +++++++++++++++++++++++++++
be/src/util/network-util.cc | 21 ++++++
be/src/util/network-util.h | 6 ++
be/src/util/string-util-test.cc | 73 ++++++++++++++++++++
be/src/util/string-util.cc | 7 ++
be/src/util/string-util.h | 15 +++-
be/src/util/summary-util-test.cc | 113 +++++++++++++++++++++++++++++++
be/src/util/ticker-test.cc | 133 ++++++++++++++++++++++++++++++++++++
be/src/util/ticker.h | 143 +++++++++++++++++++++++++++++++++++++++
be/src/util/uid-util-test.cc | 23 +++++++
be/src/util/uid-util.h | 5 ++
bin/run_clang_tidy.sh | 3 +-
16 files changed, 647 insertions(+), 28 deletions(-)
diff --git a/be/src/rpc/thrift-util-test.cc b/be/src/rpc/thrift-util-test.cc
index 4d1a580b6..427cb3d42 100644
--- a/be/src/rpc/thrift-util-test.cc
+++ b/be/src/rpc/thrift-util-test.cc
@@ -129,18 +129,4 @@ TEST(ThriftUtil, SerDeBuffer100MB) {
}
}
-TEST(ThriftUtil, TNetworkAddressComparator) {
- EXPECT_TRUE(TNetworkAddressComparator(MakeNetworkAddress("aaaa", 500),
- MakeNetworkAddress("zzzz", 500)));
- EXPECT_FALSE(TNetworkAddressComparator(MakeNetworkAddress("zzzz", 500),
- MakeNetworkAddress("aaaa", 500)));
- EXPECT_TRUE(TNetworkAddressComparator(MakeNetworkAddress("aaaa", 500),
- MakeNetworkAddress("aaaa", 501)));
- EXPECT_FALSE(TNetworkAddressComparator(MakeNetworkAddress("aaaa", 501),
- MakeNetworkAddress("aaaa", 500)));
-
- EXPECT_FALSE(TNetworkAddressComparator(MakeNetworkAddress("aaaa", 500),
- MakeNetworkAddress("aaaa", 500)));
-}
-
}
diff --git a/be/src/rpc/thrift-util.cc b/be/src/rpc/thrift-util.cc
index ce88b9b9b..973544cca 100644
--- a/be/src/rpc/thrift-util.cc
+++ b/be/src/rpc/thrift-util.cc
@@ -253,13 +253,6 @@ void PrintTColumnValue(std::ostream& out, const
TColumnValue& colval) {
}
}
-bool TNetworkAddressComparator(const TNetworkAddress& a, const
TNetworkAddress& b) {
- int cmp = a.hostname.compare(b.hostname);
- if (cmp < 0) return true;
- if (cmp == 0) return a.port < b.port;
- return false;
-}
-
bool IsReadTimeoutTException(const TTransportException& e) {
// String taken from TSocket::read() Thrift's TSocket.cpp and TSSLSocket.cpp.
// Specifically, "THRIFT_EAGAIN (timed out)" from TSocket.cpp,
diff --git a/be/src/rpc/thrift-util.h b/be/src/rpc/thrift-util.h
index 37a782b39..0945c1278 100644
--- a/be/src/rpc/thrift-util.h
+++ b/be/src/rpc/thrift-util.h
@@ -175,10 +175,6 @@ Status WaitForServer(const std::string& host, int port,
int num_retries,
/// Print a TColumnValue. If null, print "NULL".
void PrintTColumnValue(std::ostream& out, const TColumnValue& colval);
-/// Compares two TNetworkAddresses alphanumerically by their host:port
-/// string representation
-bool TNetworkAddressComparator(const TNetworkAddress& a, const
TNetworkAddress& b);
-
/// Returns true if the TTransportException corresponds to a TCP socket read
timeout.
bool IsReadTimeoutTException(const
apache::thrift::transport::TTransportException& e);
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index 3dbbbbdea..ab913f027 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -191,6 +191,7 @@ add_library(UtilTests STATIC
lru-multi-cache-test.cc
metrics-test.cc
min-max-filter-test.cc
+ network-util-test.cc
openssl-util-test.cc
os-info-test.cc
os-util-test.cc
@@ -208,11 +209,13 @@ add_library(UtilTests STATIC
simple-logger-test.cc
string-parser-test.cc
string-util-test.cc
+ summary-util-test.cc
symbols-util-test.cc
sys-info-test.cc
system-state-info-test.cc
tagged-ptr-test.cc
thread-pool-test.cc
+ ticker-test.cc
time-test.cc
tuple-row-compare-test.cc
uid-util-test.cc
@@ -250,6 +253,7 @@ ADD_UNIFIED_BE_LSAN_TEST(metrics-test "MetricsTest.*")
ADD_UNIFIED_BE_LSAN_TEST(min-max-filter-test "MinMaxFilterTest.*")
# minidump-test is flaky when the jvm pause monitor is running. So it can't be
unified.
ADD_BE_LSAN_TEST(minidump-test)
+ADD_UNIFIED_BE_LSAN_TEST(network-util-test "NetworkUtil.*")
ADD_UNIFIED_BE_LSAN_TEST(openssl-util-test "OpenSSLUtilTest.*")
ADD_UNIFIED_BE_LSAN_TEST(os-info-test "OsInfo.*")
ADD_UNIFIED_BE_LSAN_TEST(os-util-test "OsUtil.*")
@@ -267,11 +271,13 @@ ADD_UNIFIED_BE_LSAN_TEST(rle-test "BitArray.*:RleTest.*")
ADD_UNIFIED_BE_LSAN_TEST(runtime-profile-test
"CountersTest.*:TimerCounterTest.*:TimeSeriesCounterTest.*:VariousNumbers/TimeSeriesCounterResampleTest.*:ToThrift.*:ToJson.*")
ADD_UNIFIED_BE_LSAN_TEST(simple-logger-test "SimpleLoggerTest.*")
ADD_UNIFIED_BE_LSAN_TEST(string-parser-test
"StringToInt.*:StringToIntWithBase.*:StringToFloat.*:StringToBool.*:StringToDate.*")
-ADD_UNIFIED_BE_LSAN_TEST(string-util-test
"TruncateDownTest.*:TruncateUpTest.*:CommaSeparatedContainsTest.*:FindUtf8PosForwardTest.*:FindUtf8PosBackwardTest.*:RandomFindUtf8PosTest.*")
+ADD_UNIFIED_BE_LSAN_TEST(string-util-test
"TruncateDownTest.*:TruncateUpTest.*:CommaSeparatedContainsTest.*:FindUtf8PosForwardTest.*:FindUtf8PosBackwardTest.*:RandomFindUtf8PosTest.*:StringStreamPopTest.*")
+ADD_UNIFIED_BE_LSAN_TEST(summary-util-test "PrintTableTest.*")
ADD_UNIFIED_BE_LSAN_TEST(symbols-util-test "SymbolsUtil.*")
ADD_UNIFIED_BE_LSAN_TEST(system-state-info-test "SystemStateInfoTest.*")
ADD_UNIFIED_BE_LSAN_TEST(sys-info-test "CpuInfoTest.*:DiskInfoTest.*")
ADD_UNIFIED_BE_LSAN_TEST(thread-pool-test "ThreadPoolTest.*")
+ADD_UNIFIED_BE_LSAN_TEST(ticker-test "TickerTest.*")
ADD_UNIFIED_BE_LSAN_TEST(time-test "TimeTest.*")
ADD_UNIFIED_BE_LSAN_TEST(tuple-row-compare-test "TupleRowCompareTest.*")
ADD_UNIFIED_BE_LSAN_TEST(uid-util-test "UidUtil.*")
diff --git a/be/src/util/network-util-test.cc b/be/src/util/network-util-test.cc
new file mode 100644
index 000000000..c255466cc
--- /dev/null
+++ b/be/src/util/network-util-test.cc
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "gen-cpp/Types_types.h"
+
+#include "testutil/gtest-util.h"
+#include "util/network-util.h"
+
+namespace impala {
+
+// NetAddrComp Tests: These tests assert the TNetworkAddressComparator sorts
two
+// TNetworkAddress objects correctly based on their host, port, and uds
address fields.
+
+// Assert where host fields are different.
+TEST(NetworkUtil, NetAddrCompHostnameDiff) {
+ TNetworkAddressComparator fixture;
+ TNetworkAddress first;
+ TNetworkAddress second;
+
+ first.__set_hostname("aaaa");
+ first.__set_uds_address("uds");
+ first.__set_port(0);
+
+ second.__set_hostname("bbbb");
+ second.__set_uds_address("uds");
+ second.__set_port(0);
+
+ ASSERT_TRUE(fixture(first, second));
+ ASSERT_FALSE(fixture(second, first));
+}
+
+// Assert where host fields are equal but port is different.
+TEST(NetworkUtil, NetAddrCompPortDiff) {
+ TNetworkAddressComparator fixture;
+ TNetworkAddress first;
+ TNetworkAddress second;
+
+ first.__set_hostname("host");
+ first.__set_port(0);
+ first.__set_uds_address("");
+
+ second.__set_hostname("host");
+ second.__set_port(1);
+ second.__set_uds_address("");
+
+ ASSERT_TRUE(fixture(first, second));
+ ASSERT_FALSE(fixture(second, first));
+}
+
+// Assert where host and port fields are equal but uds address is different.
+TEST(NetworkUtil, NetAddrCompUDSAddrDiff) {
+ TNetworkAddressComparator fixture;
+ TNetworkAddress first;
+ TNetworkAddress second;
+
+ first.__set_hostname("host");
+ first.__set_port(0);
+ first.__set_uds_address("aaaa");
+
+ second.__set_hostname("host");
+ second.__set_port(0);
+ second.__set_uds_address("bbbb");
+
+ ASSERT_TRUE(fixture(first, second));
+ ASSERT_FALSE(fixture(second, first));
+}
+
+// Assert where all three comparison fields are equal.
+TEST(NetworkUtil, NetAddrUDSAddrSame) {
+ TNetworkAddressComparator fixture;
+ TNetworkAddress first;
+ TNetworkAddress second;
+
+ first.__set_hostname("host");
+ first.__set_port(0);
+ first.__set_uds_address("uds");
+
+ second.__set_hostname("host");
+ second.__set_port(0);
+ second.__set_uds_address("uds");
+
+ ASSERT_FALSE(fixture(first, second));
+ ASSERT_FALSE(fixture(second, first));
+}
+
+} // namespace impala
diff --git a/be/src/util/network-util.cc b/be/src/util/network-util.cc
index f0c352abe..80b2cde0a 100644
--- a/be/src/util/network-util.cc
+++ b/be/src/util/network-util.cc
@@ -252,6 +252,27 @@ NetworkAddressPB FromTNetworkAddress(const
TNetworkAddress& address) {
return address_pb;
}
+bool TNetworkAddressComparator::operator()(const TNetworkAddress& a,
+ const TNetworkAddress& b) const {
+ const int host_compare = a.hostname.compare(b.hostname);
+
+ if (host_compare < 0) {
+ return true;
+ } else if(host_compare > 0) {
+ return false;
+ }
+
+ // Hostnames were the same, compare on port
+ if (a.port < b.port) {
+ return true;
+ } else if (a.port > b.port) {
+ return false;
+ }
+
+ // Hostnames and ports were the same, compare on uds address.
+ return a.uds_address.compare(b.uds_address) < 0;
+}
+
/// Pick a random port in the range of ephemeral ports
/// https://tools.ietf.org/html/rfc6335
int FindUnusedEphemeralPort() {
diff --git a/be/src/util/network-util.h b/be/src/util/network-util.h
index 600bb7396..359cc11f7 100644
--- a/be/src/util/network-util.h
+++ b/be/src/util/network-util.h
@@ -99,6 +99,12 @@ NetworkAddressPB FromTNetworkAddress(const TNetworkAddress&
address);
Status NetworkAddressPBToSockaddr(
const NetworkAddressPB& address, bool use_uds, kudu::Sockaddr* sockaddr);
+/// Custom comparator to sort network addresses first by host (alphabetically)
and then by
+/// by port (numerically) and finally by uds address (alphabetically).
+struct TNetworkAddressComparator {
+ bool operator()(const TNetworkAddress& a, const TNetworkAddress& b) const;
+};
+
/// Returns a ephemeral port that is currently unused. Returns -1 on an error
or if
/// a free ephemeral port can't be found after 100 tries.
int FindUnusedEphemeralPort();
diff --git a/be/src/util/string-util-test.cc b/be/src/util/string-util-test.cc
index 6d88b8812..6e2c6b07b 100644
--- a/be/src/util/string-util-test.cc
+++ b/be/src/util/string-util-test.cc
@@ -269,5 +269,78 @@ TEST(RandomFindUtf8PosTest, Basic) {
}
}
+// StringStreamPopTest: These tests assert the functionality of the
StringStreamPop class.
+
+// Assert the most common use case where the last character is popped and a
new character
+// is written to the stream.
+TEST(StringStreamPopTest, NotEmptyPopOnce) {
+ StringStreamPop fixture;
+ fixture << "this is a tes,";
+ fixture.move_back();
+ fixture << "t";
+ EXPECT_EQ("this is a test", fixture.str());
+}
+
+// Asssert where the stream only contains a single character that is popped
before another
+// character is written to the stream.
+TEST(StringStreamPopTest, OneCharPop) {
+ StringStreamPop fixture;
+ fixture << "t";
+ fixture.move_back();
+ fixture << "v";
+ EXPECT_EQ("v", fixture.str());
+}
+
+// Assert where the last two characters of a non-empty stream are popped.
+TEST(StringStreamPopTest, NotEmptyPopTwice) {
+ StringStreamPop fixture;
+ fixture << "this is a second te,,";
+ fixture.move_back();
+ fixture.move_back();
+ fixture << "st";
+ EXPECT_EQ("this is a second test", fixture.str());
+}
+
+// Assert where an empty stream has it's last (nonexistant) character popped.
+TEST(StringStreamPopTest, EmptyPopOnce) {
+ StringStreamPop fixture;
+ fixture.move_back();
+ EXPECT_TRUE(fixture.str().empty());
+}
+
+// Assert where an empty stream has it's last (nonexistant) character popped
twice.
+TEST(StringStreamPopTest, EmptyPopTwice) {
+ StringStreamPop fixture;
+ fixture.move_back();
+ fixture.move_back();
+ EXPECT_TRUE(fixture.str().empty());
+}
+
+// Assert the move_back functionality does not actually remove the character.
+TEST(StringStreamPopTest, PopOnceBeforeAppend) {
+ StringStreamPop fixture;
+ fixture.move_back();
+ fixture << "a";
+ fixture.move_back();
+
+ // This assertion is correct because the move_back() function only moves the
write
+ // pointer, it does not modify the internal buffer.
+ EXPECT_EQ("a", fixture.str());
+}
+
+// Assert the StringStreamPop class behavior matches the behavior of the
stringstream
+// class.
+TEST(StringStreamPopTest, CompareWithStringstream) {
+ StringStreamPop fixture;
+ stringstream expected;
+
+ expected << "C++ is" << " an " << "invisible found" << "ation of " <<
"everything!";
+ fixture << "C++ is" << " an " << "invisible found" << "ation of " <<
"everything?";
+ fixture.move_back();
+ fixture << '!';
+
+ EXPECT_EQ(expected.str(), fixture.str());
+}
+
}
diff --git a/be/src/util/string-util.cc b/be/src/util/string-util.cc
index 5394b1ffd..3e0a92213 100644
--- a/be/src/util/string-util.cc
+++ b/be/src/util/string-util.cc
@@ -143,4 +143,11 @@ int FindUtf8PosBackward(const uint8_t* ptr, const int len,
int index) {
DCHECK_EQ(pos, -1);
return -1;
}
+
+void StringStreamPop::move_back() {
+ if (tellp() > 0) {
+ seekp(-1, std::ios_base::cur);
+ }
+}
+
}
diff --git a/be/src/util/string-util.h b/be/src/util/string-util.h
index e1749ce72..7b16bf498 100644
--- a/be/src/util/string-util.h
+++ b/be/src/util/string-util.h
@@ -93,6 +93,19 @@ int FindUtf8PosBackward(const uint8_t* str_ptr, const int
str_len, const int ind
inline int FindUtf8PosBackward(const char* str_ptr, const int str_len, const
int index) {
return FindUtf8PosBackward(reinterpret_cast<const uint8_t*>(str_ptr),
str_len, index);
}
-}
+/// Subclass of std::stringstream that adds functionality to allow overwriting
the very
+/// last character of the stream. The purpose of this additional functionality
is to
+/// enable comma delimited string building where the last instance of the
comma needs to
+/// be removed (for example when building a list of columns in a sql
statement).
+class StringStreamPop : public std::basic_stringstream<char> {
+public:
+ /// Directly modifies the underlying stream buffer seeking it backwards 1
position.
+ /// Then, when additional characters are written, the character at the end
of the stream
+ /// is overwritten. Thus, to truly remove the character at the end of the
stream
+ /// requires writing at least one character to the stream after this
function is called.
+ void move_back();
+};
+
+}
#endif
diff --git a/be/src/util/summary-util-test.cc b/be/src/util/summary-util-test.cc
new file mode 100644
index 000000000..c0e209cba
--- /dev/null
+++ b/be/src/util/summary-util-test.cc
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "testutil/gtest-util.h"
+
+#include "gen-cpp/ExecStats_types.h"
+#include "util/summary-util.h"
+
+namespace impala {
+
+static const string expected_str = "\n"
+"Operator #Hosts #Inst Avg Time Max Time #Rows Est. #Rows
Peak Mem "
+" Est. Peak Mem Detail \n"
+"----------------------------------------------------------------------------------------"
+"----------------------------------------------------------\n"
+"F01:ROOT 1 1 857.074us 857.074us
4.01 MB "
+" 4.00 MB \n"
+"01:EXCHANGE 1 1 269.934us 269.934us 99 7
88.00 KB "
+" 16.00 KB UNPARTITIONED \n"
+"F00:EXCHANGE SENDER 3 3 332.506us 338.449us
7.95 KB "
+" 96.00 KB \n"
+"00:SCAN HDFS 3 3 1s328ms 1s331ms 99 7
360.00 KB "
+" 64.00 MB default.test_query_log_beeswax_1707938440 ";
+
+static TExecStats buildExecStats(int64_t latency, int64_t mem_used,
+ int64_t cardinality) {
+ TExecStats stat;
+
+ if (latency > -1) {
+ stat.__set_latency_ns(latency);
+ }
+ stat.__set_memory_used(mem_used);
+ stat.__set_cardinality(cardinality);
+
+ return stat;
+}
+
+static TPlanNodeExecSummary buildPlanNode(TPlanNodeId node_id, TFragmentIdx
fragment_idx,
+ string label, string detail, int32_t num_hosts, int32_t num_children,
+ bool is_broadcast, TExecStats estimates) {
+ TPlanNodeExecSummary node;
+
+ node.__set_node_id(node_id);
+ node.__set_fragment_idx(fragment_idx);
+ node.__set_label(label);
+ node.__set_label_detail(detail);
+ node.__set_num_children(num_children);
+ node.__set_estimated_stats(estimates);
+ if (is_broadcast) {
+ node.__set_is_broadcast(is_broadcast);
+ }
+ node.__set_num_hosts(num_hosts);
+
+ return node;
+}
+
+// Constructs a simple exec summary and ensures the text table is generated
correctly for
+// that exec summary.
+TEST(PrintTableTest, HappyPath) {
+ TExecSummary input;
+
+ TPlanNodeExecSummary node = buildPlanNode(-1, 0, "F01:ROOT", "", 1, 1,
+ false, buildExecStats(-1, 4194304, -1));
+ node.exec_stats.push_back(buildExecStats(857074, 4202496, -1));
+ node.__isset.exec_stats = true;
+ input.nodes.push_back(node);
+
+ node = buildPlanNode(1, 0, "01:EXCHANGE", "UNPARTITIONED", 1, 0,
+ true, buildExecStats(-1, 16384, 7));
+ node.exec_stats.push_back(buildExecStats(269934, 90112, 99));
+ node.__isset.exec_stats = true;
+ input.nodes.push_back(node);
+
+ node = buildPlanNode(-1, 1, "F00:EXCHANGE SENDER", "", 3, 1,
+ false, buildExecStats(-1, 98304, -1));
+ node.exec_stats.push_back(buildExecStats(338449, 6862, -1));
+ node.exec_stats.push_back(buildExecStats(333098, 8139, -1));
+ node.exec_stats.push_back(buildExecStats(325971, 8139, -1));
+ node.__isset.exec_stats = true;
+ input.nodes.push_back(node);
+
+ node = buildPlanNode(0, 1, "00:SCAN HDFS",
+ "default.test_query_log_beeswax_1707938440", 3, 0, false,
+ buildExecStats(-1, 67108864, 7));
+ node.exec_stats.push_back(buildExecStats(1331010710, 368640, 39));
+ node.exec_stats.push_back(buildExecStats(1326558546, 279552, 30));
+ node.exec_stats.push_back(buildExecStats(1327758097, 283648, 30));
+ node.__isset.exec_stats = true;
+ input.nodes.push_back(node);
+
+ input.__isset.nodes = true;
+ input.exch_to_sender_map.emplace(1, 2);
+ input.__isset.exch_to_sender_map = true;
+
+ string actual = PrintExecSummary(input);
+ EXPECT_EQ(expected_str, actual);
+}
+
+} // namespace impala
\ No newline at end of file
diff --git a/be/src/util/ticker-test.cc b/be/src/util/ticker-test.cc
new file mode 100644
index 000000000..08fe8d6a3
--- /dev/null
+++ b/be/src/util/ticker-test.cc
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <chrono>
+#include <condition_variable>
+#include <memory>
+#include <mutex>
+#include <string>
+
+#include "testutil/gtest-util.h"
+
+#include "common/status.h"
+#include "util/stopwatch.h"
+#include "util/ticker.h"
+
+using namespace std;
+
+namespace impala {
+
+static inline float NsToMs(int64_t nanos) {
+ return static_cast<float>(nanos / 1000000);
+}
+
+TEST(TickerTest, TickerSecondsBoolHappyPath) {
+ condition_variable cv;
+ mutex mu;
+ uint8_t cntr = 0;
+
+ TickerSecondsBool fixture(1, cv, mu);
+ MonotonicStopWatch sw;
+
+ sw.Start();
+ ABORT_IF_ERROR(fixture.Start("category", "tickersecondsbool-happy-path"));
+
+ while (cntr < 3) {
+ unique_lock<mutex> l(mu);
+ cv.wait(l, fixture.WakeupGuard());
+ fixture.ResetWakeupGuard();
+ cntr++;
+ }
+
+ sw.Stop();
+
+ fixture.RequestStop();
+ fixture.Join();
+
+ EXPECT_EQ(cntr, 3);
+ // Include a 30 millisecond (1%) margin of error to tolerate differences in
the
+ // precision of time measurements.
+ EXPECT_NEAR(NsToMs(sw.ElapsedTime()), static_cast<float>(3000), 30);
+}
+
+TEST(TickerTest, GenericTickerHappyPath) {
+ condition_variable cv;
+ mutex mu;
+ shared_ptr<string> wakeup_guard = make_shared<string>();
+ uint8_t cntr = 0;
+ const string wakeup_val = "wakeup";
+
+ Ticker<chrono::milliseconds, string> fixture(chrono::milliseconds(5), cv, mu,
+ wakeup_guard, wakeup_val);
+ MonotonicStopWatch sw;
+
+ sw.Start();
+ ABORT_IF_ERROR(fixture.Start("category", "generic-ticker-happy-path"));
+
+ while (cntr < 10) {
+ unique_lock<mutex> l(mu);
+ cv.wait(l, fixture.WakeupGuard());
+ *wakeup_guard = "";
+ cntr++;
+ }
+
+ sw.Stop();
+
+ fixture.RequestStop();
+ fixture.Join();
+
+ EXPECT_EQ(cntr, 10);
+ // Include a 5 millisecond (1%) margin of error to tolerate differences in
the
+ // precision of time measurements.
+ EXPECT_NEAR(NsToMs(sw.ElapsedTime()), static_cast<float>(50), 5);
+}
+
+// Tests the case where the wakeup guard is not reset by the consuming code.
+TEST(TickerTest, GenericTickerNoWakeupGuardReset) {
+ condition_variable cv;
+ mutex mu;
+ shared_ptr<string> wakeup_guard = make_shared<string>();
+ uint8_t cntr = 0;
+ const string wakeup_val = "wakeup";
+
+ Ticker<chrono::milliseconds, string> fixture(chrono::milliseconds(5), cv, mu,
+ wakeup_guard, wakeup_val);
+ MonotonicStopWatch sw;
+
+ sw.Start();
+ ABORT_IF_ERROR(fixture.Start("category", "generic-ticker-happy-path"));
+
+ while (cntr < 10) {
+ unique_lock<mutex> l(mu);
+ cv.wait(l, fixture.WakeupGuard());
+ // No wakeup guard reset here.
+ cntr++;
+ }
+
+ sw.Stop();
+
+ fixture.RequestStop();
+ fixture.Join();
+
+ EXPECT_EQ(cntr, 10);
+ // If the wakeup guard was set properly, elapsed time would be 50
milliseconds. Since
+ // the wakeup guard does not get set, spurious wakeups of the condition
variable happen
+ // much more frequently than they should.
+ EXPECT_NEAR(NsToMs(sw.ElapsedTime()), static_cast<float>(5), 5);
+}
+
+} // namespace impala
diff --git a/be/src/util/ticker.h b/be/src/util/ticker.h
new file mode 100644
index 000000000..116e9574a
--- /dev/null
+++ b/be/src/util/ticker.h
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <chrono>
+#include <condition_variable>
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <thread>
+
+#include <boost/bind.hpp>
+
+#include "common/atomic.h"
+#include "common/status.h"
+#include "util/thread.h"
+
+namespace impala {
+
+// Manages a thread that periodically notifies a condition variable. This
thread never
+// returns. An indicator variable must be specified to guard against spurious
wakeups.
+//
+// Immediately before this class notfies the condition variable, it sets the
indicator
+// variable to the `wakeup_value` specified in the constructor. It is the
responsibility
+// of the thread consuming this class to reset the indicator variable to a
value other
+// than `wakeup_value` before the consuming thread goes to sleep.
+//
+// If the periodic code takes longer to run than the specified duration, then
the code
+// will immediately execute the next time around.
+//
+// Internally, this class uses std::this_thread:sleep_for which may sleep for
longer than
+// the specified duration due to scheduling or resource contention delays.
+// For details, see https://en.cppreference.com/w/cpp/thread/sleep_for.
+//
+// Example usage:
+//
+// #include <chrono>
+// #include <condition_variable>
+// #include <memory>
+// #include <mutex>
+//
+// #include "common/status.h"
+//
+// std::condition_variable cv;
+// std::mutex mu;
+// std::shared_ptr<bool> wakeup_guard = make_shared<bool>();
+// Ticker<std::chrono::seconds, bool> ticker(std::chrono::seconds(30), cv,
mu,
+// wakeup_guard, true);
+//
+// ABORT_IF_ERROR(ticker.Start());
+//
+// while(true) {
+// unique_lock<mutex> l(mu);
+// cv.wait(l, ticker.WakeupGuard());
+// *wakeup_guard = false;
+//
+// run_my_code();
+// }
+
+template <typename DurationType, typename IndicatorType>
+class Ticker {
+ public:
+ Ticker(DurationType interval, std::condition_variable& cv,
+ std::mutex& lock, std::shared_ptr<IndicatorType> indicator,
+ IndicatorType wakeup_value) : interval_(interval), cv_(cv),
lock_(lock),
+ indicator_(indicator), wakeup_value_(wakeup_value) {}
+
+ Status Start(const std::string& category, const std::string& name) {
+ return Thread::Create(category, name, &Ticker::run, this, &my_thread_);
+ }
+
+ // Specify that the next iteration of this ticker be the last. This
function does not
+ // block nor does it cause the ticker to wake up earlier than scheduled.
+ void RequestStop() {
+ stop_requested_.Store(true);
+ }
+
+ // Wait for the ticker to exit after it's final iteration.
+ void Join() {
+ my_thread_->Join();
+ }
+
+ // Provides a default implementation for the condition variable predicate
lambda.
+ std::function<bool()> WakeupGuard() {
+ return [this]{ return *indicator_ == wakeup_value_; };
+ }
+
+ protected:
+ const DurationType interval_;
+ std::condition_variable& cv_;
+ std::mutex& lock_;
+ std::shared_ptr<IndicatorType> indicator_;
+ const IndicatorType wakeup_value_;
+
+ private:
+ std::unique_ptr<Thread> my_thread_;
+ AtomicBool stop_requested_;
+
+ void run() {
+ while (!stop_requested_.Load()) {
+ std::this_thread::sleep_for(interval_);
+
+ {
+ std::lock_guard<std::mutex> l(lock_);
+ *indicator_ = wakeup_value_;
+ }
+
+ cv_.notify_all();
+ }
+ }
+}; // class Ticker
+
+// Specialization of the Ticker class that uses seconds for the duration and
bool as the
+// wakeup indicator. The boolean shared_ptr indicator is internally managed.
Use the
+// ResetWakeupGuard() function in your code immediately after the condition
variable wait
+// to set the internally managed wakeup guard for the next iteration.
+class TickerSecondsBool : public Ticker<std::chrono::seconds, bool> {
+ public:
+ TickerSecondsBool(uint32_t interval, std::condition_variable& cv,
+ std::mutex& lock) :
+ Ticker(std::chrono::seconds(interval), cv, lock,
std::make_shared<bool>(), true) {}
+
+ void ResetWakeupGuard() {
+ *indicator_ = false;
+ }
+}; // class TickerSecondsBool
+
+} // namespace impala
diff --git a/be/src/util/uid-util-test.cc b/be/src/util/uid-util-test.cc
index 60b985b64..cf255f052 100644
--- a/be/src/util/uid-util-test.cc
+++ b/be/src/util/uid-util-test.cc
@@ -35,5 +35,28 @@ TEST(UidUtil, FragmentInstanceId) {
}
}
+TEST(UidUtil, UuidNotEmpty) {
+ TUniqueId fixture = GenerateUUID();
+ EXPECT_FALSE(UUIDEmpty(fixture));
+}
+
+TEST(UidUtil, UuidHalfEmptyHi) {
+ TUniqueId fixture;
+ fixture.hi = 0;
+ fixture.lo = 1;
+ EXPECT_FALSE(UUIDEmpty(fixture));
+}
+
+TEST(UidUtil, UuidHalfEmptyLo) {
+ TUniqueId fixture;
+ fixture.hi = 1;
+ fixture.lo = 0;
+ EXPECT_FALSE(UUIDEmpty(fixture));
+}
+
+TEST(UidUtil, UuidEmpty) {
+ EXPECT_TRUE(UUIDEmpty(TUniqueId()));
+}
+
}
diff --git a/be/src/util/uid-util.h b/be/src/util/uid-util.h
index 04c1ad75e..b3fa39f5f 100644
--- a/be/src/util/uid-util.h
+++ b/be/src/util/uid-util.h
@@ -129,4 +129,9 @@ inline TUniqueId GenerateUUID() {
memcpy(&uid.lo, u.data() + sizeof(int64_t), sizeof(int64_t));
return uid;
}
+
+/// Determines if a query id is empty.
+inline bool UUIDEmpty(const TUniqueId& id) {
+ return id.hi == 0 && id.lo == 0;
+}
} // namespace impala
diff --git a/bin/run_clang_tidy.sh b/bin/run_clang_tidy.sh
index 816f201dd..7ca1d58da 100755
--- a/bin/run_clang_tidy.sh
+++ b/bin/run_clang_tidy.sh
@@ -29,8 +29,8 @@
set -euo pipefail
-echo "Compiling"
TMP_BUILDALL_LOG=$(mktemp)
+echo "Compiling, for build logs see ${TMP_BUILDALL_LOG}"
if ! ./buildall.sh -skiptests -tidy -so -noclean &> "${TMP_BUILDALL_LOG}"
then
echo "buildall.sh failed, dumping output" >&2
@@ -59,6 +59,7 @@ export
PATH="${IMPALA_TOOLCHAIN_PACKAGES_HOME}/llvm-${IMPALA_LLVM_VERSION}/share
:${IMPALA_TOOLCHAIN_PACKAGES_HOME}/llvm-${IMPALA_LLVM_VERSION}/bin/\
:$PATH"
TMP_STDERR=$(mktemp)
+echo; echo "Running clang tidy, for error logs see ${TMP_STDERR}"
STRCAT_MESSAGE="Impala-specific note: This can also be fixed using the
StrCat() function \
from be/src/gutil/strings strcat.h)"
CLANG_STRING_CONCAT="performance-inefficient-string-concatenation"