This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-4.1 by this push:
     new 5e39c54678 Avoid initializing schema via 
SystemKeyspace.getPreferredIP() with the BulkLoader tool
5e39c54678 is described below

commit 5e39c54678b0d70145b5c699593eb9ce7fa7706a
Author: Caleb Rackliffe <[email protected]>
AuthorDate: Wed Jun 22 17:12:41 2022 -0600

    Avoid initializing schema via SystemKeyspace.getPreferredIP() with the 
BulkLoader tool
    
    patch by Caleb Rackliffe; reviewed by Jon Meredith for CASSANDRA-17740
    
    Co-authored-by: Caleb Rackliffe <[email protected]>
    Co-authored-by: Jon Meredith <[email protected]>
---
 CHANGES.txt                                                |  1 +
 src/java/org/apache/cassandra/db/SystemKeyspace.java       |  3 +++
 .../org/apache/cassandra/streaming/StreamingChannel.java   | 14 ++++++++++++++
 .../streaming/async/StreamingMultiplexedChannel.java       |  2 +-
 .../apache/cassandra/tools/BulkLoadConnectionFactory.java  |  8 +++++++-
 test/unit/org/apache/cassandra/tools/ToolRunner.java       |  9 +++++----
 6 files changed, 31 insertions(+), 6 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index e1f6a2dd66..f469fe25b2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1-alpha2
+ * Avoid initializing schema via SystemKeyspace.getPreferredIP() with the 
BulkLoader tool (CASSANDRA-17740)
  * Uncomment prepared_statements_cache_size, key_cache_size, 
counter_cache_size, index_summary_capacity which were
    commented out by mistake in a previous patch
    Fix breaking change with cache_load_timeout; cache_load_timeout_seconds <=0 
and cache_load_timeout=0 are equivalent
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java 
b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 6fbbc3e621..1523720d53 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -42,6 +42,7 @@ import javax.management.openmbean.OpenDataException;
 import javax.management.openmbean.TabularData;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -978,6 +979,8 @@ public final class SystemKeyspace
      */
     public static InetAddressAndPort getPreferredIP(InetAddressAndPort ep)
     {
+        Preconditions.checkState(DatabaseDescriptor.isDaemonInitialized()); // 
Make sure being used as a daemon, not a tool
+
         String req = "SELECT preferred_ip, preferred_port FROM system.%s WHERE 
peer=? AND peer_port = ?";
         UntypedResultSet result = executeInternal(String.format(req, 
PEERS_V2), ep.getAddress(), ep.getPort());
         if (!result.isEmpty() && result.one().has("preferred_ip"))
diff --git a/src/java/org/apache/cassandra/streaming/StreamingChannel.java 
b/src/java/org/apache/cassandra/streaming/StreamingChannel.java
index a638638bc2..6b623b8588 100644
--- a/src/java/org/apache/cassandra/streaming/StreamingChannel.java
+++ b/src/java/org/apache/cassandra/streaming/StreamingChannel.java
@@ -58,6 +58,20 @@ public interface StreamingChannel
             // Implementations can decide whether or not to do something with 
the preferred address.
             return create(to, messagingVersion, kind);
         }
+
+        /** Provide way to disable getPreferredIP() for tools without access 
to the system keyspace
+         *
+         * CASSANDRA-17663 moves calls to SystemKeyspace.getPreferredIP() 
outside of any threads
+         * that are regularly interrupted.  However the streaming subsystem is 
also used
+         * by the bulk loader tool, which does not have direct access to the 
local tables
+         * and uses the client metadata/queries to retrieve it.
+         *
+         * @return true if SystemKeyspace.getPreferredIP() should be used when 
connecting
+         */
+        default boolean supportsPreferredIp()
+        {
+            return true;
+        }
     }
 
     public enum Kind { CONTROL, FILE }
diff --git 
a/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java
 
b/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java
index c2e551edb6..38277e6741 100644
--- 
a/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java
+++ 
b/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java
@@ -211,7 +211,7 @@ public class StreamingMultiplexedChannel
             if (logger.isDebugEnabled())
                 logger.debug("{} Sending {}", createLogTag(session), message);
 
-            InetAddressAndPort connectTo = SystemKeyspace.getPreferredIP(to);
+            InetAddressAndPort connectTo = factory.supportsPreferredIp() ? 
SystemKeyspace.getPreferredIP(to) : to;
             return fileTransferExecutor.submit(new 
FileStreamTask((OutgoingStreamMessage) message, connectTo));
         }
 
diff --git a/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java 
b/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
index 38a9e31c65..eef0ef4cba 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
@@ -53,7 +53,8 @@ public class BulkLoadConnectionFactory extends 
NettyStreamingConnectionFactory
                                    int messagingVersion,
                                    StreamingChannel.Kind kind) throws 
IOException
     {
-        // Supply a preferred address to the template, which will be 
overwritten if encryption is configured.
+        // The preferred address is always overwritten in create(). This 
method override only exists so we can avoid 
+        // falling back to the NettyStreamingConnectionFactory implementation.
         OutboundConnectionSettings template = new 
OutboundConnectionSettings(getByAddress(to), getByAddress(preferred));
         return create(template, messagingVersion, kind);
     }
@@ -70,4 +71,9 @@ public class BulkLoadConnectionFactory extends 
NettyStreamingConnectionFactory
 
         return connect(template, messagingVersion, kind);
     }
+    @Override
+    public boolean supportsPreferredIp()
+    {
+        return false; // called in a tool context, do not use getPreferredIP
+    }
 }
diff --git a/test/unit/org/apache/cassandra/tools/ToolRunner.java 
b/test/unit/org/apache/cassandra/tools/ToolRunner.java
index 6f5516099d..e3b9595e09 100644
--- a/test/unit/org/apache/cassandra/tools/ToolRunner.java
+++ b/test/unit/org/apache/cassandra/tools/ToolRunner.java
@@ -46,9 +46,7 @@ import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.distributed.api.IInstance;
 import org.apache.cassandra.distributed.api.NodeToolResult;
 import org.apache.cassandra.utils.Pair;
-import org.assertj.core.api.Assertions;
 import org.assertj.core.util.Lists;
-import org.assertj.core.util.Strings;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -402,8 +400,11 @@ public class ToolRunner
          */
         public void assertCleanStdErr()
         {
-            assertTrue("Failed because cleaned stdErr wasn't empty: " + 
getCleanedStderr(),
-                       getCleanedStderr().isEmpty());
+            String raw = getStderr();
+            String cleaned = getCleanedStderr();
+            assertTrue("Failed to clean stderr completely.\nRaw (length=" + 
raw.length() + "):\n" + raw + 
+                       "\nCleaned (length=" + cleaned.length() + "):\n" + 
cleaned,
+                       cleaned.trim().isEmpty());
         }
 
         public void assertOnExitCode()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to