This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new d3a875f8b6 feat(object_store): random IP address selection (#7123)
d3a875f8b6 is described below

commit d3a875f8b603d2c7429964a6f8959bd055616d34
Author: Marco Neumann <[email protected]>
AuthorDate: Wed Feb 12 14:17:09 2025 +0100

    feat(object_store): random IP address selection (#7123)
    
    * feat(object_store): random IP address selection
    
    Closes #7117.
    
    * refactor: directly call stdlib w/o hyper-util
---
 object_store/src/client/dns.rs | 50 ++++++++++++++++++++++++++++++++++++++++++
 object_store/src/client/mod.rs | 18 +++++++++++++++
 2 files changed, 68 insertions(+)

diff --git a/object_store/src/client/dns.rs b/object_store/src/client/dns.rs
new file mode 100644
index 0000000000..32e9291bac
--- /dev/null
+++ b/object_store/src/client/dns.rs
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::net::ToSocketAddrs;
+
+use rand::prelude::SliceRandom;
+use reqwest::dns::{Addrs, Name, Resolve, Resolving};
+use tokio::task::JoinSet;
+
+type DynErr = Box<dyn std::error::Error + Send + Sync>;
+
+#[derive(Debug)]
+pub(crate) struct ShuffleResolver;
+
+impl Resolve for ShuffleResolver {
+    fn resolve(&self, name: Name) -> Resolving {
+        Box::pin(async move {
+            // use `JoinSet` to propagate cancelation
+            let mut tasks = JoinSet::new();
+            tasks.spawn_blocking(move || {
+                let it = (name.as_str(), 0).to_socket_addrs()?;
+                let mut addrs = it.collect::<Vec<_>>();
+
+                addrs.shuffle(&mut rand::rng());
+
+                Ok(Box::new(addrs.into_iter()) as Addrs)
+            });
+
+            tasks
+                .join_next()
+                .await
+                .expect("spawned on task")
+                .map_err(|err| Box::new(err) as DynErr)?
+        })
+    }
+}
diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs
index 1b7ce5aa7a..6297159556 100644
--- a/object_store/src/client/mod.rs
+++ b/object_store/src/client/mod.rs
@@ -19,6 +19,8 @@
 
 pub(crate) mod backoff;
 
+mod dns;
+
 #[cfg(test)]
 pub(crate) mod mock_server;
 
@@ -110,6 +112,10 @@ pub enum ClientConfigKey {
     ProxyCaCertificate,
     /// List of hosts that bypass proxy
     ProxyExcludes,
+    /// Randomize order addresses that the DNS resolution yields.
+    ///
+    /// This will spread the connections accross more servers.
+    RandomizeAddresses,
     /// Request timeout
     ///
     /// The timeout is applied from when the request starts connecting until 
the
@@ -137,6 +143,7 @@ impl AsRef<str> for ClientConfigKey {
             Self::ProxyUrl => "proxy_url",
             Self::ProxyCaCertificate => "proxy_ca_certificate",
             Self::ProxyExcludes => "proxy_excludes",
+            Self::RandomizeAddresses => "randomize_addresses",
             Self::Timeout => "timeout",
             Self::UserAgent => "user_agent",
         }
@@ -163,6 +170,7 @@ impl FromStr for ClientConfigKey {
             "proxy_url" => Ok(Self::ProxyUrl),
             "proxy_ca_certificate" => Ok(Self::ProxyCaCertificate),
             "proxy_excludes" => Ok(Self::ProxyExcludes),
+            "randomize_addresses" => Ok(Self::RandomizeAddresses),
             "timeout" => Ok(Self::Timeout),
             "user_agent" => Ok(Self::UserAgent),
             _ => Err(super::Error::UnknownConfigurationKey {
@@ -245,6 +253,7 @@ pub struct ClientOptions {
     http2_max_frame_size: Option<ConfigValue<u32>>,
     http1_only: ConfigValue<bool>,
     http2_only: ConfigValue<bool>,
+    randomize_addresses: ConfigValue<bool>,
 }
 
 impl Default for ClientOptions {
@@ -280,6 +289,7 @@ impl Default for ClientOptions {
             // https://github.com/apache/arrow-rs/issues/5194
             http1_only: true.into(),
             http2_only: Default::default(),
+            randomize_addresses: true.into(),
         }
     }
 }
@@ -322,6 +332,9 @@ impl ClientOptions {
             ClientConfigKey::ProxyUrl => self.proxy_url = Some(value.into()),
             ClientConfigKey::ProxyCaCertificate => self.proxy_ca_certificate = 
Some(value.into()),
             ClientConfigKey::ProxyExcludes => self.proxy_excludes = 
Some(value.into()),
+            ClientConfigKey::RandomizeAddresses => {
+                self.randomize_addresses.parse(value);
+            }
             ClientConfigKey::Timeout => self.timeout = 
Some(ConfigValue::Deferred(value.into())),
             ClientConfigKey::UserAgent => {
                 self.user_agent = Some(ConfigValue::Deferred(value.into()))
@@ -358,6 +371,7 @@ impl ClientOptions {
             ClientConfigKey::ProxyUrl => self.proxy_url.clone(),
             ClientConfigKey::ProxyCaCertificate => 
self.proxy_ca_certificate.clone(),
             ClientConfigKey::ProxyExcludes => self.proxy_excludes.clone(),
+            ClientConfigKey::RandomizeAddresses => 
Some(self.randomize_addresses.to_string()),
             ClientConfigKey::Timeout => 
self.timeout.as_ref().map(fmt_duration),
             ClientConfigKey::UserAgent => self
                 .user_agent
@@ -675,6 +689,10 @@ impl ClientOptions {
         // transparently decompress the body via the non-default `gzip` 
feature.
         builder = builder.no_gzip();
 
+        if self.randomize_addresses.get()? {
+            builder = builder.dns_resolver(Arc::new(dns::ShuffleResolver));
+        }
+
         builder
             .https_only(!self.allow_http.get()?)
             .build()

Reply via email to