This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 411309acf IMPALA-11979: Add 'scheduling_seed' to customize consistent 
scheduling behavior
411309acf is described below

commit 411309acf4d3f326f05dfa04749a0ec0e2ccc801
Author: Joe McDonnell <[email protected]>
AuthorDate: Sun Dec 15 13:15:12 2024 -0800

    IMPALA-11979: Add 'scheduling_seed' to customize consistent scheduling 
behavior
    
    This adds a startup parameter --scheduling_seed which is a string
    identifier of an executor within an executor group. It should
    be unique within an executor group, but it could be reused
    across executors groups on the same system. This is used for
    scan range scheduling for remote filesystems, so this can be
    used to make the scheduling deterministic across multiple
    executor groups or when an executor group gets restarted on
    machines with different IP addresses.
    
    For example, the 3rd executor in an executor group of size 8
    might use "executor_3_of_8" for its scheduling seed. If there
    are multiple executors groups of size 8, the 3rd in each can
    use that scheduling seed.
    
    Testing:
     - Ran core job
    
    Change-Id: Ie01c7d119cc88766082dbfca3ff685354d01f71f
    Reviewed-on: http://gerrit.cloudera.org:8080/22214
    Reviewed-by: Joe McDonnell <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/experiments/hash-ring-util.cc     | 12 +++++++++++-
 be/src/scheduling/executor-group.cc      |  4 +++-
 be/src/scheduling/hash-ring-test.cc      | 21 ++++++++++++++++++---
 be/src/scheduling/hash-ring.cc           |  9 ++++++---
 be/src/scheduling/hash-ring.h            |  4 +++-
 be/src/service/impala-server.cc          |  8 ++++++++
 common/protobuf/statestore_service.proto |  6 ++++++
 7 files changed, 55 insertions(+), 9 deletions(-)

diff --git a/be/src/experiments/hash-ring-util.cc 
b/be/src/experiments/hash-ring-util.cc
index e866f1096..439f743aa 100644
--- a/be/src/experiments/hash-ring-util.cc
+++ b/be/src/experiments/hash-ring-util.cc
@@ -21,6 +21,7 @@
 #include <iostream>
 
 #include "common/init.h"
+#include "gutil/strings/substitute.h"
 #include "scheduling/hash-ring.h"
 #include "scheduling/cluster-membership-test-util.h"
 #include "scheduling/scheduler-test-util.h"
@@ -33,6 +34,9 @@
 
 DEFINE_int32(num_hosts, 10, "Number of hosts for simulation");
 DEFINE_int32(num_replicas, 10, "Replication factor for hashring");
+DEFINE_bool(use_scheduling_seeds, false,
+    "If true, simulate using strings like 'executor_1_of_8' as scheduling 
seeds. "
+    "If false, use IP addresses as scheduling seeds.");
 
 namespace impala {
 
@@ -55,7 +59,13 @@ public:
     HashRing hashring(num_replicas);
     for (int host_idx = 0; host_idx < num_hosts; host_idx++) {
       IpAddr node = test::HostIdxToIpAddr(host_idx);
-      hashring.AddNode(node);
+      if (FLAGS_use_scheduling_seeds) {
+        std::string scheduling_seed =
+            Substitute("executor_$0_of_$1", host_idx, num_hosts);
+        hashring.AddNode(node, scheduling_seed);
+      } else {
+        hashring.AddNode(node);
+      }
     }
     int64_t end_nanos = MonotonicNanos();
     map<IpAddr, uint64_t> distribution;
diff --git a/be/src/scheduling/executor-group.cc 
b/be/src/scheduling/executor-group.cc
index d50ea1344..988a08e6c 100644
--- a/be/src/scheduling/executor-group.cc
+++ b/be/src/scheduling/executor-group.cc
@@ -95,7 +95,9 @@ void ExecutorGroup::AddExecutor(const BackendDescriptorPB& 
be_desc) {
     return;
   }
   if (be_descs.empty()) {
-    executor_ip_hash_ring_.AddNode(be_desc.ip_address());
+    // Use the 'scheduling_seed' for hashing to allow different executor 
groups to
+    // schedule the same way.
+    executor_ip_hash_ring_.AddNode(be_desc.ip_address(), 
be_desc.scheduling_seed());
   }
   be_descs.push_back(be_desc);
 
diff --git a/be/src/scheduling/hash-ring-test.cc 
b/be/src/scheduling/hash-ring-test.cc
index eeecca58d..e57f1890e 100644
--- a/be/src/scheduling/hash-ring-test.cc
+++ b/be/src/scheduling/hash-ring-test.cc
@@ -19,6 +19,7 @@
 #include <math.h>
 #include <stdio.h>
 #include <iostream>
+#include <optional>
 
 #include "scheduling/hash-ring.h"
 #include "testutil/gtest-util.h"
@@ -39,9 +40,11 @@ class HashRingTest : public ::testing::Test {
   // Verify the specified hash ring has the appropriate number of nodes and 
replicas.
   // This assumes no collisions. If there are collisions, the total replicas 
will be
   // smaller than expected.
-  void VerifyCounts(const HashRing& hash_ring, uint32_t num_nodes) {
-    EXPECT_EQ(hash_ring.GetNumNodes(), num_nodes);
-    EXPECT_EQ(hash_ring.GetTotalReplicas(), num_nodes * 
hash_ring.GetNumReplicas());
+  void VerifyCounts(const HashRing& hash_ring, uint32_t expected_num_nodes,
+      std::optional<uint32_t> expected_num_replicas = std::nullopt) {
+    EXPECT_EQ(hash_ring.GetNumNodes(), expected_num_nodes);
+    EXPECT_EQ(hash_ring.GetTotalReplicas(),
+        expected_num_replicas.value_or(expected_num_nodes * 
hash_ring.GetNumReplicas()));
   }
 
   // Verify that the allocations that GetDistributionMap() returns for each 
node
@@ -205,6 +208,18 @@ TEST_F(HashRingTest, Collisions) {
   VerifyCounts(h, 1000);
 }
 
+TEST_F(HashRingTest, SchedulingSeed) {
+  const uint32_t replication = 10;
+  vector<IpAddr> basic_addresses;
+  GetBasicNetworkAddresses(basic_addresses);
+  HashRing h(replication);
+  // Add the nodes to the hashring with a single scheduling seed. This will 
cause all of
+  // the nodes to collide.
+  for (const IpAddr& addr : basic_addresses) h.AddNode(addr, "abc123");
+  // All the ip addresses were added, but they all collide.
+  VerifyCounts(h, basic_addresses.size(), replication);
+}
+
 TEST_F(HashRingTest, MaxMinRatio) {
   // The ratio of the maximum node to the minimum node should be bounded.
   // This is purely a functional question. It makes no assumption about the 
underlying
diff --git a/be/src/scheduling/hash-ring.cc b/be/src/scheduling/hash-ring.cc
index 37bbed5c9..ce512b62d 100644
--- a/be/src/scheduling/hash-ring.cc
+++ b/be/src/scheduling/hash-ring.cc
@@ -40,14 +40,17 @@ HashRing::HashRing(const HashRing& hash_ring)
   }
 }
 
-void HashRing::AddNode(const IpAddr& node) {
+void HashRing::AddNode(const IpAddr& node, std::string_view 
scheduling_seed_in) {
   // This node should not already be in the set.
   std::pair<NodeIterator, bool> node_pair = nodes_.insert(node);
   // 'second' tells whether a new element was inserted. It must be true.
   DCHECK(node_pair.second) << "Failed to add: " << node;
   NodeIterator node_it = node_pair.first;
-  // Generate multiple hashes of the IpAddr by using the hash as a seed to a 
PRNG.
-  uint32_t hash = HashUtil::Hash(node.data(), node.length(), 0);
+  // If the scheduling seed argument is empty, use the IP address
+  std::string_view scheduling_seed = scheduling_seed_in;
+  if (scheduling_seed.empty()) scheduling_seed = node;
+  uint32_t hash = HashUtil::Hash(scheduling_seed.data(), 
scheduling_seed.length(), 0);
+  // Generate multiple hashes for the node by using the hash as a seed to a 
PRNG.
   pcg32 prng(hash);
   for (uint32_t i = 0; i < num_replicas_; i++) {
     uint32_t hash_val = prng();
diff --git a/be/src/scheduling/hash-ring.h b/be/src/scheduling/hash-ring.h
index 73a13c883..514a5dc39 100644
--- a/be/src/scheduling/hash-ring.h
+++ b/be/src/scheduling/hash-ring.h
@@ -20,6 +20,7 @@
 
 #include <map>
 #include <set>
+#include <string_view>
 #include <vector>
 
 #include "common/logging.h"
@@ -72,7 +73,8 @@ class HashRing {
   /// insert the node into the map at the hash location. In the event of a 
hash collision,
   /// the map will be set to the minimum of the current value and the new 
value.
   /// Nodes must be unique.
-  void AddNode(const IpAddr& node);
+  void AddNode(const IpAddr& node,
+      std::string_view scheduling_seed = std::string_view());
 
   /// This removes the specified node from the hashring. This removes all 
elements that
   /// reference this node. Nodes must be unique.
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 40409a252..6ac2646cc 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -328,6 +328,13 @@ DEFINE_string(executor_groups, "",
     "contains at least that number of executors for the group will it be 
considered "
     "healthy for admission. Currently only a single group may be specified.");
 
+DEFINE_string(scheduling_seed, "",
+    "Unique identifier for this executor within its executor group. This is 
used for "
+    "consistent scheduling. If not specified, this uses the executor's IP 
address. "
+    "To have consistent scheduling across different executor groups, this can 
be set "
+    "so that executor 1 out of 8 on executor group A has the same value as 
executor 1"
+    "out of 8 on executor group B.");
+
 DEFINE_int32(num_expected_executors, 20,
     "The number of executors that are expected to "
     "be available for the execution of a single query. This value is used 
during "
@@ -2663,6 +2670,7 @@ void 
ImpalaServer::BuildLocalBackendDescriptorInternal(BackendDescriptorPB* be_d
     be_desc->set_process_start_time(CurrentTimeString());
   }
   be_desc->set_version(GetBuildVersion(/* compact */ true));
+  be_desc->set_scheduling_seed(FLAGS_scheduling_seed);
 }
 
 void ImpalaServer::ConnectionStart(
diff --git a/common/protobuf/statestore_service.proto 
b/common/protobuf/statestore_service.proto
index 2f2f83168..d459660c0 100644
--- a/common/protobuf/statestore_service.proto
+++ b/common/protobuf/statestore_service.proto
@@ -84,4 +84,10 @@ message BackendDescriptorPB {
   // The pretty-printed string representation of program version and build 
version for
   // this backend.
   optional string version = 14;
+
+  // Unique identifier of an executor within a single executor group (e.g. 3rd 
out of 8
+  // executors). This is used for consistently scheduling scan ranges. This is 
unique
+  // within an executor group, but executors in different executor groups can 
use the
+  // same identifier.
+  optional string scheduling_seed = 15;
 }

Reply via email to