This is an automated email from the ASF dual-hosted git repository.
maedhroz pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.1 by this push:
new 5e39c54678 Avoid initializing schema via
SystemKeyspace.getPreferredIP() with the BulkLoader tool
5e39c54678 is described below
commit 5e39c54678b0d70145b5c699593eb9ce7fa7706a
Author: Caleb Rackliffe <[email protected]>
AuthorDate: Wed Jun 22 17:12:41 2022 -0600
Avoid initializing schema via SystemKeyspace.getPreferredIP() with the
BulkLoader tool
patch by Caleb Rackliffe; reviewed by Jon Meredith for CASSANDRA-17740
Co-authored-by: Caleb Rackliffe <[email protected]>
Co-authored-by: Jon Meredith <[email protected]>
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/SystemKeyspace.java | 3 +++
.../org/apache/cassandra/streaming/StreamingChannel.java | 14 ++++++++++++++
.../streaming/async/StreamingMultiplexedChannel.java | 2 +-
.../apache/cassandra/tools/BulkLoadConnectionFactory.java | 8 +++++++-
test/unit/org/apache/cassandra/tools/ToolRunner.java | 9 +++++----
6 files changed, 31 insertions(+), 6 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index e1f6a2dd66..f469fe25b2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.1-alpha2
+ * Avoid initializing schema via SystemKeyspace.getPreferredIP() with the
BulkLoader tool (CASSANDRA-17740)
* Uncomment prepared_statements_cache_size, key_cache_size,
counter_cache_size, index_summary_capacity which were
commented out by mistake in a previous patch
Fix breaking change with cache_load_timeout; cache_load_timeout_seconds <=0
and cache_load_timeout=0 are equivalent
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java
b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 6fbbc3e621..1523720d53 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -42,6 +42,7 @@ import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@@ -978,6 +979,8 @@ public final class SystemKeyspace
*/
public static InetAddressAndPort getPreferredIP(InetAddressAndPort ep)
{
+ Preconditions.checkState(DatabaseDescriptor.isDaemonInitialized()); //
Make sure being used as a daemon, not a tool
+
String req = "SELECT preferred_ip, preferred_port FROM system.%s WHERE
peer=? AND peer_port = ?";
UntypedResultSet result = executeInternal(String.format(req,
PEERS_V2), ep.getAddress(), ep.getPort());
if (!result.isEmpty() && result.one().has("preferred_ip"))
diff --git a/src/java/org/apache/cassandra/streaming/StreamingChannel.java
b/src/java/org/apache/cassandra/streaming/StreamingChannel.java
index a638638bc2..6b623b8588 100644
--- a/src/java/org/apache/cassandra/streaming/StreamingChannel.java
+++ b/src/java/org/apache/cassandra/streaming/StreamingChannel.java
@@ -58,6 +58,20 @@ public interface StreamingChannel
// Implementations can decide whether or not to do something with
the preferred address.
return create(to, messagingVersion, kind);
}
+
+ /** Provide way to disable getPreferredIP() for tools without access
to the system keyspace
+ *
+ * CASSANDRA-17663 moves calls to SystemKeyspace.getPreferredIP()
outside of any threads
+ * that are regularly interrupted. However the streaming subsystem is
also used
+ * by the bulk loader tool, which does not have direct access to the
local tables
+ * and uses the client metadata/queries to retrieve it.
+ *
+ * @return true if SystemKeyspace.getPreferredIP() should be used when
connecting
+ */
+ default boolean supportsPreferredIp()
+ {
+ return true;
+ }
}
public enum Kind { CONTROL, FILE }
diff --git
a/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java
b/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java
index c2e551edb6..38277e6741 100644
---
a/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java
+++
b/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java
@@ -211,7 +211,7 @@ public class StreamingMultiplexedChannel
if (logger.isDebugEnabled())
logger.debug("{} Sending {}", createLogTag(session), message);
- InetAddressAndPort connectTo = SystemKeyspace.getPreferredIP(to);
+ InetAddressAndPort connectTo = factory.supportsPreferredIp() ?
SystemKeyspace.getPreferredIP(to) : to;
return fileTransferExecutor.submit(new
FileStreamTask((OutgoingStreamMessage) message, connectTo));
}
diff --git a/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
b/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
index 38a9e31c65..eef0ef4cba 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
@@ -53,7 +53,8 @@ public class BulkLoadConnectionFactory extends
NettyStreamingConnectionFactory
int messagingVersion,
StreamingChannel.Kind kind) throws
IOException
{
- // Supply a preferred address to the template, which will be
overwritten if encryption is configured.
+ // The preferred address is always overwritten in create(). This
method override only exists so we can avoid
+ // falling back to the NettyStreamingConnectionFactory implementation.
OutboundConnectionSettings template = new
OutboundConnectionSettings(getByAddress(to), getByAddress(preferred));
return create(template, messagingVersion, kind);
}
@@ -70,4 +71,9 @@ public class BulkLoadConnectionFactory extends
NettyStreamingConnectionFactory
return connect(template, messagingVersion, kind);
}
+ @Override
+ public boolean supportsPreferredIp()
+ {
+ return false; // called in a tool context, do not use getPreferredIP
+ }
}
diff --git a/test/unit/org/apache/cassandra/tools/ToolRunner.java
b/test/unit/org/apache/cassandra/tools/ToolRunner.java
index 6f5516099d..e3b9595e09 100644
--- a/test/unit/org/apache/cassandra/tools/ToolRunner.java
+++ b/test/unit/org/apache/cassandra/tools/ToolRunner.java
@@ -46,9 +46,7 @@ import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.distributed.api.IInstance;
import org.apache.cassandra.distributed.api.NodeToolResult;
import org.apache.cassandra.utils.Pair;
-import org.assertj.core.api.Assertions;
import org.assertj.core.util.Lists;
-import org.assertj.core.util.Strings;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertTrue;
@@ -402,8 +400,11 @@ public class ToolRunner
*/
public void assertCleanStdErr()
{
- assertTrue("Failed because cleaned stdErr wasn't empty: " +
getCleanedStderr(),
- getCleanedStderr().isEmpty());
+ String raw = getStderr();
+ String cleaned = getCleanedStderr();
+ assertTrue("Failed to clean stderr completely.\nRaw (length=" +
raw.length() + "):\n" + raw +
+ "\nCleaned (length=" + cleaned.length() + "):\n" +
cleaned,
+ cleaned.trim().isEmpty());
}
public void assertOnExitCode()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]