This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new d3a875f8b6 feat(object_store): random IP address selection (#7123)
d3a875f8b6 is described below
commit d3a875f8b603d2c7429964a6f8959bd055616d34
Author: Marco Neumann <[email protected]>
AuthorDate: Wed Feb 12 14:17:09 2025 +0100
feat(object_store): random IP address selection (#7123)
* feat(object_store): random IP address selection
Closes #7117.
* refactor: directly call stdlib w/o hyper-util
---
object_store/src/client/dns.rs | 50 ++++++++++++++++++++++++++++++++++++++++++
object_store/src/client/mod.rs | 18 +++++++++++++++
2 files changed, 68 insertions(+)
diff --git a/object_store/src/client/dns.rs b/object_store/src/client/dns.rs
new file mode 100644
index 0000000000..32e9291bac
--- /dev/null
+++ b/object_store/src/client/dns.rs
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::net::ToSocketAddrs;
+
+use rand::prelude::SliceRandom;
+use reqwest::dns::{Addrs, Name, Resolve, Resolving};
+use tokio::task::JoinSet;
+
+type DynErr = Box<dyn std::error::Error + Send + Sync>;
+
+#[derive(Debug)]
+pub(crate) struct ShuffleResolver;
+
+impl Resolve for ShuffleResolver {
+ fn resolve(&self, name: Name) -> Resolving {
+ Box::pin(async move {
+ // use `JoinSet` to propagate cancelation
+ let mut tasks = JoinSet::new();
+ tasks.spawn_blocking(move || {
+ let it = (name.as_str(), 0).to_socket_addrs()?;
+ let mut addrs = it.collect::<Vec<_>>();
+
+ addrs.shuffle(&mut rand::rng());
+
+ Ok(Box::new(addrs.into_iter()) as Addrs)
+ });
+
+ tasks
+ .join_next()
+ .await
+ .expect("spawned on task")
+ .map_err(|err| Box::new(err) as DynErr)?
+ })
+ }
+}
diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs
index 1b7ce5aa7a..6297159556 100644
--- a/object_store/src/client/mod.rs
+++ b/object_store/src/client/mod.rs
@@ -19,6 +19,8 @@
pub(crate) mod backoff;
+mod dns;
+
#[cfg(test)]
pub(crate) mod mock_server;
@@ -110,6 +112,10 @@ pub enum ClientConfigKey {
ProxyCaCertificate,
/// List of hosts that bypass proxy
ProxyExcludes,
+ /// Randomize order addresses that the DNS resolution yields.
+ ///
+ /// This will spread the connections accross more servers.
+ RandomizeAddresses,
/// Request timeout
///
/// The timeout is applied from when the request starts connecting until
the
@@ -137,6 +143,7 @@ impl AsRef<str> for ClientConfigKey {
Self::ProxyUrl => "proxy_url",
Self::ProxyCaCertificate => "proxy_ca_certificate",
Self::ProxyExcludes => "proxy_excludes",
+ Self::RandomizeAddresses => "randomize_addresses",
Self::Timeout => "timeout",
Self::UserAgent => "user_agent",
}
@@ -163,6 +170,7 @@ impl FromStr for ClientConfigKey {
"proxy_url" => Ok(Self::ProxyUrl),
"proxy_ca_certificate" => Ok(Self::ProxyCaCertificate),
"proxy_excludes" => Ok(Self::ProxyExcludes),
+ "randomize_addresses" => Ok(Self::RandomizeAddresses),
"timeout" => Ok(Self::Timeout),
"user_agent" => Ok(Self::UserAgent),
_ => Err(super::Error::UnknownConfigurationKey {
@@ -245,6 +253,7 @@ pub struct ClientOptions {
http2_max_frame_size: Option<ConfigValue<u32>>,
http1_only: ConfigValue<bool>,
http2_only: ConfigValue<bool>,
+ randomize_addresses: ConfigValue<bool>,
}
impl Default for ClientOptions {
@@ -280,6 +289,7 @@ impl Default for ClientOptions {
// https://github.com/apache/arrow-rs/issues/5194
http1_only: true.into(),
http2_only: Default::default(),
+ randomize_addresses: true.into(),
}
}
}
@@ -322,6 +332,9 @@ impl ClientOptions {
ClientConfigKey::ProxyUrl => self.proxy_url = Some(value.into()),
ClientConfigKey::ProxyCaCertificate => self.proxy_ca_certificate =
Some(value.into()),
ClientConfigKey::ProxyExcludes => self.proxy_excludes =
Some(value.into()),
+ ClientConfigKey::RandomizeAddresses => {
+ self.randomize_addresses.parse(value);
+ }
ClientConfigKey::Timeout => self.timeout =
Some(ConfigValue::Deferred(value.into())),
ClientConfigKey::UserAgent => {
self.user_agent = Some(ConfigValue::Deferred(value.into()))
@@ -358,6 +371,7 @@ impl ClientOptions {
ClientConfigKey::ProxyUrl => self.proxy_url.clone(),
ClientConfigKey::ProxyCaCertificate =>
self.proxy_ca_certificate.clone(),
ClientConfigKey::ProxyExcludes => self.proxy_excludes.clone(),
+ ClientConfigKey::RandomizeAddresses =>
Some(self.randomize_addresses.to_string()),
ClientConfigKey::Timeout =>
self.timeout.as_ref().map(fmt_duration),
ClientConfigKey::UserAgent => self
.user_agent
@@ -675,6 +689,10 @@ impl ClientOptions {
// transparently decompress the body via the non-default `gzip`
feature.
builder = builder.no_gzip();
+ if self.randomize_addresses.get()? {
+ builder = builder.dns_resolver(Arc::new(dns::ShuffleResolver));
+ }
+
builder
.https_only(!self.allow_http.get()?)
.build()