absurdfarce commented on code in PR #1743:
URL:
https://github.com/apache/cassandra-java-driver/pull/1743#discussion_r1628509853
##########
core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DistanceEvent.java:
##########
@@ -29,11 +31,16 @@
@Immutable
public class DistanceEvent {
public final NodeDistance distance;
- public final DefaultNode node;
+ private final WeakReference<DefaultNode> node;
Review Comment:
Same as discussed in NodeStateEvent; I don't think we need a WeakReference
here.
##########
core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java:
##########
@@ -274,40 +303,23 @@ protected boolean isBusy(@NonNull Node node, @NonNull
Session session) {
}
protected boolean isResponseRateInsufficient(@NonNull Node node, long now) {
- // response rate is considered insufficient when less than 2 responses
were obtained in
- // the past interval delimited by RESPONSE_COUNT_RESET_INTERVAL_NANOS.
- if (responseTimes.containsKey(node)) {
- AtomicLongArray array = responseTimes.get(node);
- if (array.length() == 2) {
- long threshold = now - RESPONSE_COUNT_RESET_INTERVAL_NANOS;
- long leastRecent = array.get(0);
- return leastRecent - threshold < 0;
- }
+ NodeResponseRateSample sample = responseTimes.getIfPresent(node);
+ if (sample == null) {
+ return true;
Review Comment:
I'm wondering if this is too aggressive now that we're not guaranteed that
historical node response time data will always be present. I'm leaning towards
an argument which says that we should only say a node's response time is
insufficient if we have clear, unambiguous data which indicates that. That
means that missing data (either because it hasn't been observed yet or GC
pressure has removed it now that we're using weak refs) would _not_ be enough
to mark a node as unhealthy.
@adutra I'm very interested in your take on this; you have considerably more
context with which to evaluate this question than I do. :)
##########
core/src/main/java/com/datastax/oss/driver/internal/core/metadata/NodeStateEvent.java:
##########
@@ -53,14 +55,19 @@ public static NodeStateEvent removed(DefaultNode node) {
*/
public final NodeState newState;
- public final DefaultNode node;
+ private final WeakReference<DefaultNode> node;
Review Comment:
I don't think we need this change. Events of this type are very
short-lived; they exist to communicate information between driver components
that don't necessarily know about each other. They aren't stockpiled or stored
in any meaningful way. You address the problem that was originally reported by
changing the distances map in LoadBalancingPolicyWrapper to use weak refs...
it's not at all clear to me that making the events use weak references buys you
much on top of that.
Perhaps more importantly this change has the potential to make events a lot
less useful. The driver uses events to notify components about changes in
nodes, but if the _actual node affected_ might not be present what use is the
notification? Components that receive events without node information have no
choice but to ignore them which means (a) every receiver has to check for null
node information and (b) if you just ignore events without node data (which all
these receivers appear to do) you'll get a lot more indeterminate behaviour
since the presence or absence of node data in events is basically a random
value (since it's determined by GC pressure which is essentially random from
the perspective of the driver).
I note that the null checks referenced in (a) above are a big chunk of
what's actually in this PR. If not for this constraint the change set would be
a lot smaller. That wouldn't be the worse thing as the presence of all those
null checks obscures the more significant changes in at least a moderate way.
##########
core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java:
##########
@@ -96,14 +100,39 @@ public class DefaultLoadBalancingPolicy extends
BasicLoadBalancingPolicy impleme
private static final int MAX_IN_FLIGHT_THRESHOLD = 10;
private static final long RESPONSE_COUNT_RESET_INTERVAL_NANOS =
MILLISECONDS.toNanos(200);
- protected final Map<Node, AtomicLongArray> responseTimes = new
ConcurrentHashMap<>();
+ protected final LoadingCache<Node, NodeResponseRateSample> responseTimes;
protected final Map<Node, Long> upTimes = new ConcurrentHashMap<>();
private final boolean avoidSlowReplicas;
public DefaultLoadBalancingPolicy(@NonNull DriverContext context, @NonNull
String profileName) {
super(context, profileName);
this.avoidSlowReplicas =
profile.getBoolean(DefaultDriverOption.LOAD_BALANCING_POLICY_SLOW_AVOIDANCE,
true);
+ CacheLoader<Node, NodeResponseRateSample> cacheLoader =
+ new CacheLoader<Node, NodeResponseRateSample>() {
+ @Override
+ public NodeResponseRateSample load(Node key) {
+ NodeResponseRateSample sample = responseTimes.getIfPresent(key);
+ if (sample == null) {
+ sample = new NodeResponseRateSample();
+ } else {
+ sample.update();
+ }
+ return sample;
+ }
+ };
+ this.responseTimes =
+ CacheBuilder.newBuilder()
+ .weakKeys()
+ .removalListener(
+ (RemovalListener<Node, NodeResponseRateSample>)
+ notification ->
+ LOG.trace(
+ "[{}] Evicting response times for {}: {}",
+ logPrefix,
+ notification.getKey(),
+ notification.getCause()))
Review Comment:
@aratno I know you asked for this RemovalListener in a [previous
comment](https://github.com/apache/cassandra-java-driver/pull/1743#discussion_r1479160472)
under the assumption that it would help identify cases in which a node was
(incorrectly) marked unhealthy because data had expired from the map. I'm
going to argue instead that we shouldn't mark a node as unhealthy unless we
have clear data indicating that it is so... which means the absence of response
time data isn't enough to make it unhealthy. If we take that approach would
you still argue for a removal listener here? I'm kind of two minds about it
myself; I can see the benefit but it's possibly less interesting if the
potentially damaging effects are mitigated.
Thoughts?
##########
core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java:
##########
@@ -96,14 +100,39 @@ public class DefaultLoadBalancingPolicy extends
BasicLoadBalancingPolicy impleme
private static final int MAX_IN_FLIGHT_THRESHOLD = 10;
private static final long RESPONSE_COUNT_RESET_INTERVAL_NANOS =
MILLISECONDS.toNanos(200);
- protected final Map<Node, AtomicLongArray> responseTimes = new
ConcurrentHashMap<>();
+ protected final LoadingCache<Node, NodeResponseRateSample> responseTimes;
protected final Map<Node, Long> upTimes = new ConcurrentHashMap<>();
private final boolean avoidSlowReplicas;
public DefaultLoadBalancingPolicy(@NonNull DriverContext context, @NonNull
String profileName) {
super(context, profileName);
this.avoidSlowReplicas =
profile.getBoolean(DefaultDriverOption.LOAD_BALANCING_POLICY_SLOW_AVOIDANCE,
true);
+ CacheLoader<Node, NodeResponseRateSample> cacheLoader =
+ new CacheLoader<Node, NodeResponseRateSample>() {
+ @Override
+ public NodeResponseRateSample load(Node key) {
+ NodeResponseRateSample sample = responseTimes.getIfPresent(key);
+ if (sample == null) {
+ sample = new NodeResponseRateSample();
+ } else {
+ sample.update();
+ }
+ return sample;
+ }
+ };
Review Comment:
You actually don't need a LoadingCache here; a simple Cache will do. A
LoadingCache is useful when you need to create an entry for a key that's been
requested but isn't in the cache yet. That's not what's going on here; your
CacheLoader is loading keys from the map, creating a new version if it isn't
present and then updating it. You're not doing something to create the
NodeResponseRateSample instance beyond calling the constructor if necessary.
You can accomplish the exact same thing using just a regular Cache by
changing your update logic just a bit:
```java
protected void updateResponseTimes(@NonNull Node node) {
try {
responseTimes.get(node, NodeResponseRateSample::new).update();
}
catch (ExecutionException ee) {
LOG.info("[{}] Exception updating node response times: {}", logPrefix,
ee);
}
}
```
I note that this approach is even
[recommended](https://javadoc.io/static/com.google.guava/guava/33.2.1-jre/com/google/common/cache/Cache.html#put(K,V))
in the Guava Javadoc.
##########
core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java:
##########
@@ -274,40 +303,23 @@ protected boolean isBusy(@NonNull Node node, @NonNull
Session session) {
}
protected boolean isResponseRateInsufficient(@NonNull Node node, long now) {
- // response rate is considered insufficient when less than 2 responses
were obtained in
- // the past interval delimited by RESPONSE_COUNT_RESET_INTERVAL_NANOS.
- if (responseTimes.containsKey(node)) {
- AtomicLongArray array = responseTimes.get(node);
- if (array.length() == 2) {
- long threshold = now - RESPONSE_COUNT_RESET_INTERVAL_NANOS;
- long leastRecent = array.get(0);
- return leastRecent - threshold < 0;
- }
+ NodeResponseRateSample sample = responseTimes.getIfPresent(node);
+ if (sample == null) {
+ return true;
+ } else {
+ return !sample.hasSufficientResponses(now);
Review Comment:
Style nit: the ternary operator does this a bit more cleanly:
```java
return (sample == null) ? true : !sample.hasSufficientResponses(now)
```
##########
core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java:
##########
@@ -318,4 +330,42 @@ protected int getInFlight(@NonNull Node node, @NonNull
Session session) {
// processing them).
return (pool == null) ? 0 : pool.getInFlight();
}
+
+ // Exposed as protected for unit tests
+ protected class NodeResponseRateSample {
+ // The array stores at most two timestamps, since we don't need more;
+ // the first one is always the least recent one, and hence the one to
inspect.
+ protected AtomicLongArray times;
+
+ private NodeResponseRateSample() {
+ times = new AtomicLongArray(1);
+ times.set(0, nanoTime());
+ }
+
+ // Only for unit tests
+ protected NodeResponseRateSample(AtomicLongArray times) {
+ this.times = times;
+ }
+
+ private void update() {
+ long now = nanoTime();
+ if (times.length() == 1) {
+ long previous = times.get(0);
+ times = new AtomicLongArray(2);
+ times.set(0, previous);
+ times.set(1, now);
+ } else {
+ times.set(0, times.get(1));
+ times.set(1, now);
+ }
+ }
Review Comment:
This seems to be subject to race conditions to me, although I haven't been
able to construct a specific scenario that triggers it. The old implementation
computed a result array locally and then returned that; there was no changing
of state at the ultimate destination (the old responseTimes map) beyond
updating an entry or not doing so. This implementation is doing more than
that; we're actually changing the types of elements (this.times changes in
size) based on data we receive.
It might be okay... but then again I'm also not sure I see a harm in just
making this.times be an array of size two from the beginning.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]