This is an automated email from the ASF dual-hosted git repository.

mck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 38b4990  Upgrading Guava to 27, and to java-driver 3.6.0 (from 
3.4.0-SNAPSHOT), plus refactoring to remove nativePort argument for 
NativeSSTableLoaderClient constructor
38b4990 is described below

commit 38b49904dd1c71fcb16abfbc205edfd6ce008b76
Author: Sumanth Pasupuleti <[email protected]>
AuthorDate: Tue Apr 10 15:01:48 2018 -0700

    Upgrading Guava to 27, and to java-driver 3.6.0 (from 3.4.0-SNAPSHOT), plus 
refactoring to remove nativePort argument for NativeSSTableLoaderClient 
constructor
    
    Patch by Sumanth Pasupuleti; reviewed by Michael Semb Wever for 
CASSANDRA-14655
---
 CHANGES.txt                                        |   1 +
 build.xml                                          |  12 +---
 ...cassandra-driver-core-3.4.0-SNAPSHOT-shaded.jar | Bin 2624086 -> 0 bytes
 lib/cassandra-driver-core-3.6.0-shaded.jar         | Bin 0 -> 2725381 bytes
 lib/guava-23.3-jre.jar                             | Bin 2655564 -> 0 bytes
 lib/guava-27.0-jre.jar                             | Bin 0 -> 2747878 bytes
 .../apache/cassandra/audit/AuditLogManager.java    |   1 -
 .../cassandra/hadoop/cql3/CqlBulkRecordWriter.java |   4 +-
 .../cassandra/hadoop/cql3/CqlConfigHelper.java     |  12 ----
 .../cassandra/index/SecondaryIndexManager.java     |   6 +-
 .../cassandra/io/sstable/format/SSTableFormat.java |   2 +-
 .../org/apache/cassandra/repair/RepairJob.java     |   4 +-
 .../apache/cassandra/repair/RepairRunnable.java    |   6 +-
 .../org/apache/cassandra/repair/RepairSession.java |   2 +-
 .../repair/consistent/CoordinatorSession.java      |   2 +-
 .../cassandra/repair/consistent/LocalSessions.java |   3 +-
 .../apache/cassandra/service/StorageService.java   |   6 +-
 .../cassandra/streaming/StreamResultFuture.java    |   3 +-
 .../org/apache/cassandra/tools/BulkLoader.java     |  10 +--
 .../org/apache/cassandra/tools/LoaderOptions.java  |  72 +++++++++------------
 .../cassandra/utils/NativeSSTableLoaderClient.java |  29 ++-------
 test/unit/org/apache/cassandra/cql3/CQLTester.java |   4 ++
 .../db/repair/PendingAntiCompactionTest.java       |   2 +-
 .../apache/cassandra/hints/HintsServiceTest.java   |   3 +-
 .../org/apache/cassandra/net/MockMessagingSpy.java |   7 +-
 .../cassandra/streaming/StreamingTransferTest.java |   4 +-
 .../org/apache/cassandra/tools/BulkLoaderTest.java |  72 +++++++++++++++++++++
 .../cassandra/stress/settings/SettingsNode.java    |   6 +-
 .../cassandra/stress/util/JavaDriverClient.java    |   4 --
 29 files changed, 150 insertions(+), 127 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 1ecda8b..a72386a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha2
+ * Upgrade Guava to 27, and to java-driver 3.6.0 (from 3.4.0-SNAPSHOT) 
(CASSANDRA-14655)
  * Extract an AbstractCompactionController to allow for custom implementations 
(CASSANDRA-15286)
  * Move chronicle-core version from snapshot to stable, and include 
carrotsearch in generated pom.xml (CASSANDRA-15321)
  * Untangle RepairMessage sub-hierarchy of messages, use new messaging (more) 
correctly (CASSANDRA-15163)
diff --git a/build.xml b/build.xml
index 3eb074f..73539df 100644
--- a/build.xml
+++ b/build.xml
@@ -487,7 +487,7 @@
           <dependency groupId="org.lz4" artifactId="lz4-java" version="1.4.0"/>
           <dependency groupId="com.ning" artifactId="compress-lzf" 
version="0.8.4"/>
           <dependency groupId="com.github.luben" artifactId="zstd-jni" 
version="1.3.8-5"/>
-          <dependency groupId="com.google.guava" artifactId="guava" 
version="23.3-jre"/>
+          <dependency groupId="com.google.guava" artifactId="guava" 
version="27.0-jre"/>
           <dependency groupId="org.hdrhistogram" artifactId="HdrHistogram" 
version="2.1.9"/>
           <dependency groupId="commons-cli" artifactId="commons-cli" 
version="1.1"/>
           <dependency groupId="commons-codec" artifactId="commons-codec" 
version="1.9"/>
@@ -557,15 +557,13 @@
           <dependency groupId="net.openhft" artifactId="chronicle-threads" 
version="${chronicle-threads.version}"/>
           <dependency groupId="com.google.code.findbugs" artifactId="jsr305" 
version="2.0.2" />
           <dependency groupId="com.clearspring.analytics" artifactId="stream" 
version="2.5.2" />
-         <!-- UPDATE AND UNCOMMENT ON THE DRIVER RELEASE, BEFORE 4.0 RELEASE
-          <dependency groupId="com.datastax.cassandra" 
artifactId="cassandra-driver-core" version="3.4.0-SNAPSHOT" classifier="shaded">
+          <dependency groupId="com.datastax.cassandra" 
artifactId="cassandra-driver-core" version="3.6.0" classifier="shaded">
             <exclusion groupId="io.netty" artifactId="netty-buffer"/>
             <exclusion groupId="io.netty" artifactId="netty-codec"/>
             <exclusion groupId="io.netty" artifactId="netty-handler"/>
             <exclusion groupId="io.netty" artifactId="netty-transport"/>
             <exclusion groupId="org.slf4j" artifactId="slf4j-api"/>
           </dependency>
-         -->
           <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj" 
version="${ecj.version}" />
           <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core" 
version="${ohc.version}">
             <exclusion groupId="org.slf4j" artifactId="slf4j-api"/>
@@ -647,9 +645,7 @@
         <dependency groupId="org.apache.hadoop" 
artifactId="hadoop-minicluster"/>
         <dependency groupId="com.google.code.findbugs" artifactId="jsr305"/>
         <dependency groupId="org.antlr" artifactId="antlr"/>
-       <!-- UPDATE AND UNCOMMENT ON THE DRIVER RELEASE, BEFORE 4.0 RELEASE
         <dependency groupId="com.datastax.cassandra" 
artifactId="cassandra-driver-core" classifier="shaded"/>
-       -->
         <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"/>
         <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core"/>
         <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core-j8"/>
@@ -666,9 +662,7 @@
                 artifactId="cassandra-parent"
                 version="${version}"/>
         <dependency groupId="junit" artifactId="junit"/>
-       <!-- UPDATE AND UNCOMMENT ON THE DRIVER RELEASE, BEFORE 4.0 RELEASE
         <dependency groupId="com.datastax.cassandra" 
artifactId="cassandra-driver-core" classifier="shaded"/>
-       -->
         <dependency groupId="io.netty" artifactId="netty-all"/>
         <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"/>
         <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core"/>
@@ -743,14 +737,12 @@
         <dependency groupId="org.apache.hadoop" 
artifactId="hadoop-minicluster" optional="true"/>
 
         <!-- don't need the Java Driver to run, but if you use the hadoop 
stuff or UDFs -->
-       <!-- UPDATE AND UNCOMMENT ON THE DRIVER RELEASE, BEFORE 4.0 RELEASE
         <dependency groupId="com.datastax.cassandra" 
artifactId="cassandra-driver-core" classifier="shaded" optional="true">
           <exclusion groupId="io.netty" artifactId="netty-buffer"/>
           <exclusion groupId="io.netty" artifactId="netty-codec"/>
           <exclusion groupId="io.netty" artifactId="netty-handler"/>
           <exclusion groupId="io.netty" artifactId="netty-transport"/>
         </dependency>
-       -->
 
         <!-- don't need jna to run, but nice to have -->
         <dependency groupId="net.java.dev.jna" artifactId="jna"/>
diff --git a/lib/cassandra-driver-core-3.4.0-SNAPSHOT-shaded.jar 
b/lib/cassandra-driver-core-3.4.0-SNAPSHOT-shaded.jar
deleted file mode 100644
index 1290dc3..0000000
Binary files a/lib/cassandra-driver-core-3.4.0-SNAPSHOT-shaded.jar and 
/dev/null differ
diff --git a/lib/cassandra-driver-core-3.6.0-shaded.jar 
b/lib/cassandra-driver-core-3.6.0-shaded.jar
new file mode 100644
index 0000000..ea06b02
Binary files /dev/null and b/lib/cassandra-driver-core-3.6.0-shaded.jar differ
diff --git a/lib/guava-23.3-jre.jar b/lib/guava-23.3-jre.jar
deleted file mode 100644
index b13e275..0000000
Binary files a/lib/guava-23.3-jre.jar and /dev/null differ
diff --git a/lib/guava-27.0-jre.jar b/lib/guava-27.0-jre.jar
new file mode 100644
index 0000000..1bdd007
Binary files /dev/null and b/lib/guava-27.0-jre.jar differ
diff --git a/src/java/org/apache/cassandra/audit/AuditLogManager.java 
b/src/java/org/apache/cassandra/audit/AuditLogManager.java
index d11eaa0..bc2126c 100644
--- a/src/java/org/apache/cassandra/audit/AuditLogManager.java
+++ b/src/java/org/apache/cassandra/audit/AuditLogManager.java
@@ -37,7 +37,6 @@ import org.apache.cassandra.cql3.statements.BatchStatement;
 import org.apache.cassandra.exceptions.AuthenticationException;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.UnauthorizedException;
-import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.utils.FBUtilities;
 
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java 
b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
index 4bbb861..77ad95f 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
@@ -287,12 +287,10 @@ public class CqlBulkRecordWriter extends 
RecordWriter<Object, List<ByteBuffer>>
         public ExternalClient(Configuration conf)
         {
             super(resolveHostAddresses(conf),
-                  CqlConfigHelper.getOutputNativePort(conf),
                   ConfigHelper.getOutputInitialPort(conf),
                   ConfigHelper.getOutputKeyspaceUserName(conf),
                   ConfigHelper.getOutputKeyspacePassword(conf),
-                  CqlConfigHelper.getSSLOptions(conf).orNull(),
-                  CqlConfigHelper.getAllowServerPortDiscovery(conf));
+                  CqlConfigHelper.getSSLOptions(conf).orNull());
         }
 
         private static Collection<InetSocketAddress> 
resolveHostAddresses(Configuration conf)
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java 
b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index 3a47a72..f9a6f3a 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -88,7 +88,6 @@ public class CqlConfigHelper
 
     private static final String OUTPUT_CQL = "cassandra.output.cql";
     private static final String OUTPUT_NATIVE_PORT = 
"cassandra.output.native.port";
-    private static final String ALLOW_SERVER_PORT_DISCOVERY = 
"cassandra.allowserverportdiscovery";
 
     /**
      * Set the CQL columns for the input of this job.
@@ -652,15 +651,4 @@ public class CqlConfigHelper
                  new SecureRandom());
         return ctx;
     }
-
-    public static void setAllowServerPortDiscovery(Configuration conf, boolean 
allowServerPortDiscovery)
-    {
-        conf.set(ALLOW_SERVER_PORT_DISCOVERY, 
Boolean.toString(allowServerPortDiscovery));
-    }
-
-    public static boolean getAllowServerPortDiscovery(Configuration conf)
-    {
-        return Boolean.parseBoolean(conf.get(ALLOW_SERVER_PORT_DISCOVERY, 
"false"));
-    }
-
 }
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java 
b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index 60fc3ba..336e2ea 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -487,7 +487,7 @@ public class SecondaryIndexManager implements 
IndexRegistry, INotificationConsum
                                        builtIndexes.addAll(groupedIndexes);
                                        build.set(o);
                                    }
-                               });
+                               }, MoreExecutors.directExecutor());
                                futures.add(build);
                            });
 
@@ -1444,7 +1444,7 @@ public class SecondaryIndexManager implements 
IndexRegistry, INotificationConsum
         if (null != task)
         {
             ListenableFuture<?> f = blockingExecutor.submit(task);
-            if (callback != null) Futures.addCallback(f, callback);
+            if (callback != null) Futures.addCallback(f, callback, 
MoreExecutors.directExecutor());
             FBUtilities.waitOnFuture(f);
         }
     }
@@ -1464,7 +1464,7 @@ public class SecondaryIndexManager implements 
IndexRegistry, INotificationConsum
                              if (null != task)
                              {
                                  ListenableFuture<?> f = 
blockingExecutor.submit(task);
-                                 if (callback != null) Futures.addCallback(f, 
callback);
+                                 if (callback != null) Futures.addCallback(f, 
callback, MoreExecutors.directExecutor());
                                  waitFor.add(f);
                              }
                          });
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
index 38a7f57..14f6602 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
@@ -57,7 +57,7 @@ public interface SSTableFormat
         {
             //Since format comes right after generation
             //we disallow formats with numeric names
-            assert !CharMatcher.DIGIT.matchesAllOf(name);
+            assert !CharMatcher.digit().matchesAllOf(name);
 
             this.name = name;
             this.info = info;
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java 
b/src/java/org/apache/cassandra/repair/RepairJob.java
index f682bfb..3740070 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -391,7 +391,7 @@ public class RepairJob extends AbstractFuture<RepairResult> 
implements Runnable
 
                 // failure is handled at root of job chain
                 public void onFailure(Throwable t) {}
-            });
+            }, MoreExecutors.directExecutor());
             currentTask = nextTask;
         }
         // start running tasks
@@ -448,7 +448,7 @@ public class RepairJob extends AbstractFuture<RepairResult> 
implements Runnable
 
                     // failure is handled at root of job chain
                     public void onFailure(Throwable t) {}
-                });
+                }, MoreExecutors.directExecutor());
                 currentTask = nextTask;
             }
             // start running tasks
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java 
b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index 7e931e8..f1f12dd 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -388,7 +388,7 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
         {
             ranges.addAll(range);
         }
-        Futures.addCallback(repairResult, new 
RepairCompleteCallback(parentSession, ranges, startTime, traceState, 
hasFailure, executor));
+        Futures.addCallback(repairResult, new 
RepairCompleteCallback(parentSession, ranges, startTime, traceState, 
hasFailure, executor), MoreExecutors.directExecutor());
     }
 
     private void previewRepair(UUID parentSession,
@@ -463,7 +463,7 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
                 executor.shutdownNow();
                 return message;
             }
-        });
+        }, MoreExecutors.directExecutor());
     }
 
     private ListenableFuture<List<RepairSessionResult>> 
submitRepairSessions(UUID parentSession,
@@ -493,7 +493,7 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
                                                                                
      cfnames);
             if (session == null)
                 continue;
-            Futures.addCallback(session, new RepairSessionCallback(session));
+            Futures.addCallback(session, new RepairSessionCallback(session), 
MoreExecutors.directExecutor());
             futures.add(session);
         }
         return Futures.successfulAsList(futures);
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java 
b/src/java/org/apache/cassandra/repair/RepairSession.java
index 40f3dbe..3483e59 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -344,7 +344,7 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
                 Tracing.traceRepair("Session completed with the following 
error: {}", t);
                 forceShutdown(t);
             }
-        });
+        }, MoreExecutors.directExecutor());
     }
 
     public void terminate()
diff --git 
a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java 
b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
index 8f1759a..39549bd 100644
--- a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
+++ b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
@@ -382,7 +382,7 @@ public class CoordinatorSession extends ConsistentSession
                     resultFuture.setException(t);
                 }
             }
-        });
+        }, MoreExecutors.directExecutor());
 
         return resultFuture;
     }
diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java 
b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
index 935bba8..6475794 100644
--- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
@@ -48,6 +48,7 @@ import com.google.common.primitives.Ints;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 
 import org.apache.cassandra.locator.RangesAtEndpoint;
 import org.slf4j.Logger;
@@ -666,7 +667,7 @@ public class LocalSessions
                     executor.shutdown();
                 }
             }
-        });
+        }, MoreExecutors.directExecutor());
     }
 
     public void maybeSetRepairing(UUID sessionID)
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index a7c125c..777e40e 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1584,7 +1584,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             {
                 logger.warn("Error during bootstrap.", e);
             }
-        });
+        }, MoreExecutors.directExecutor());
         try
         {
             bootstrapStream.get();
@@ -1679,7 +1679,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                     progressSupport.progress("bootstrap", new 
ProgressEvent(ProgressEventType.ERROR, 1, 1, message));
                     progressSupport.progress("bootstrap", new 
ProgressEvent(ProgressEventType.COMPLETE, 1, 1, "Resume bootstrap complete"));
                 }
-            });
+            }, MoreExecutors.directExecutor());
             return true;
         }
         else
@@ -2958,7 +2958,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                 // We still want to send the notification
                 sendReplicationNotification(notifyEndpoint);
             }
-        });
+        }, MoreExecutors.directExecutor());
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java 
b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index 4de63be..3268ecf 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 
 import com.google.common.util.concurrent.AbstractFuture;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -145,7 +146,7 @@ public final class StreamResultFuture extends 
AbstractFuture<StreamState>
 
     public void addEventListener(StreamEventHandler listener)
     {
-        Futures.addCallback(this, listener);
+        Futures.addCallback(this, listener, MoreExecutors.directExecutor());
         eventListeners.add(listener);
     }
 
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java 
b/src/java/org/apache/cassandra/tools/BulkLoader.java
index 2ca2a3d..42c2bf3 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -57,13 +57,11 @@ public class BulkLoader
                 options.directory.getAbsoluteFile(),
                 new ExternalClient(
                         options.hosts,
-                        options.nativePort,
                         options.storagePort,
                         options.authProvider,
                         options.sslStoragePort,
                         options.serverEncOptions,
-                        buildSSLOptions(options.clientEncOptions),
-                        options.allowServerPortDiscovery),
+                        buildSSLOptions(options.clientEncOptions)),
                         handler,
                         options.connectionsPerHost,
                         options.targetKeyspace);
@@ -275,15 +273,13 @@ public class BulkLoader
         private final EncryptionOptions.ServerEncryptionOptions 
serverEncOptions;
 
         public ExternalClient(Set<InetSocketAddress> hosts,
-                              int nativePort,
                               int storagePort,
                               AuthProvider authProvider,
                               int sslStoragePort,
                               EncryptionOptions.ServerEncryptionOptions 
serverEncryptionOptions,
-                              SSLOptions sslOptions,
-                              boolean allowServerPortDiscovery)
+                              SSLOptions sslOptions)
         {
-            super(hosts, nativePort, storagePort, authProvider, sslOptions, 
allowServerPortDiscovery);
+            super(hosts, storagePort, authProvider, sslOptions);
             this.sslStoragePort = sslStoragePort;
             serverEncOptions = serverEncryptionOptions;
         }
diff --git a/src/java/org/apache/cassandra/tools/LoaderOptions.java 
b/src/java/org/apache/cassandra/tools/LoaderOptions.java
index 7ad3299..9acf8a5 100644
--- a/src/java/org/apache/cassandra/tools/LoaderOptions.java
+++ b/src/java/org/apache/cassandra/tools/LoaderOptions.java
@@ -59,7 +59,6 @@ public class LoaderOptions
     public static final String THROTTLE_MBITS = "throttle";
     public static final String INTER_DC_THROTTLE_MBITS = "inter-dc-throttle";
     public static final String TOOL_NAME = "sstableloader";
-    public static final String ALLOW_SERVER_PORT_DISCOVERY_OPTION = 
"server-port-discovery";
     public static final String TARGET_KEYSPACE = "target-keyspace";
 
     /* client encryption options */
@@ -89,7 +88,6 @@ public class LoaderOptions
     public final EncryptionOptions.ServerEncryptionOptions serverEncOptions;
     public final Set<InetSocketAddress> hosts;
     public final Set<InetAddressAndPort> ignores;
-    public final boolean allowServerPortDiscovery;
     public final String targetKeyspace;
 
     LoaderOptions(Builder builder)
@@ -110,7 +108,6 @@ public class LoaderOptions
         connectionsPerHost = builder.connectionsPerHost;
         serverEncOptions = builder.serverEncOptions;
         hosts = builder.hosts;
-        allowServerPortDiscovery = builder.allowServerPortDiscovery;
         ignores = builder.ignores;
         targetKeyspace = builder.targetKeyspace;
     }
@@ -137,7 +134,6 @@ public class LoaderOptions
         Set<InetAddress> ignoresArg = new HashSet<>();
         Set<InetSocketAddress> hosts = new HashSet<>();
         Set<InetAddressAndPort> ignores = new HashSet<>();
-        boolean allowServerPortDiscovery;
         String targetKeyspace;
 
         Builder()
@@ -307,12 +303,6 @@ public class LoaderOptions
             return this;
         }
 
-        public Builder allowServerPortDiscovery(boolean 
allowServerPortDiscovery)
-        {
-            this.allowServerPortDiscovery = allowServerPortDiscovery;
-            return this;
-        }
-
         public Builder parseArgs(String cmdArgs[])
         {
             CommandLineParser parser = new GnuParser();
@@ -359,7 +349,6 @@ public class LoaderOptions
 
                 verbose = cmd.hasOption(VERBOSE_OPTION);
                 noProgress = cmd.hasOption(NOPROGRESS_OPTION);
-                allowServerPortDiscovery = 
cmd.hasOption(ALLOW_SERVER_PORT_DISCOVERY_OPTION);
 
                 if (cmd.hasOption(USER_OPTION))
                 {
@@ -376,6 +365,32 @@ public class LoaderOptions
                     authProviderName = 
cmd.getOptionValue(AUTH_PROVIDER_OPTION);
                 }
 
+                // try to load config file first, so that values can be
+                // rewritten with other option values.
+                // otherwise use default config.
+                Config config;
+                if (cmd.hasOption(CONFIG_PATH))
+                {
+                    File configFile = new 
File(cmd.getOptionValue(CONFIG_PATH));
+                    if (!configFile.exists())
+                    {
+                        errorMsg("Config file not found", options);
+                    }
+                    config = new 
YamlConfigurationLoader().loadConfig(configFile.toURI().toURL());
+                }
+                else
+                {
+                    config = new Config();
+                    // unthrottle stream by default
+                    config.stream_throughput_outbound_megabits_per_sec = 0;
+                    
config.inter_dc_stream_throughput_outbound_megabits_per_sec = 0;
+                }
+
+                if (cmd.hasOption(NATIVE_PORT_OPTION))
+                    nativePort = 
Integer.parseInt(cmd.getOptionValue(NATIVE_PORT_OPTION));
+                else
+                    nativePort = config.native_transport_port;
+
                 if (cmd.hasOption(INITIAL_HOST_ADDRESS_OPTION))
                 {
                     String[] nodes = 
cmd.getOptionValue(INITIAL_HOST_ADDRESS_OPTION).split(",");
@@ -398,6 +413,11 @@ public class LoaderOptions
                     System.exit(1);
                 }
 
+                if (cmd.hasOption(STORAGE_PORT_OPTION))
+                    storagePort = 
Integer.parseInt(cmd.getOptionValue(STORAGE_PORT_OPTION));
+                else
+                    storagePort = config.storage_port;
+
                 if (cmd.hasOption(IGNORE_NODES_OPTION))
                 {
                     String[] nodes = 
cmd.getOptionValue(IGNORE_NODES_OPTION).split(",");
@@ -418,35 +438,6 @@ public class LoaderOptions
                     connectionsPerHost = 
Integer.parseInt(cmd.getOptionValue(CONNECTIONS_PER_HOST));
                 }
 
-                // try to load config file first, so that values can be
-                // rewritten with other option values.
-                // otherwise use default config.
-                Config config;
-                if (cmd.hasOption(CONFIG_PATH))
-                {
-                    File configFile = new 
File(cmd.getOptionValue(CONFIG_PATH));
-                    if (!configFile.exists())
-                    {
-                        errorMsg("Config file not found", options);
-                    }
-                    config = new 
YamlConfigurationLoader().loadConfig(configFile.toURI().toURL());
-                }
-                else
-                {
-                    config = new Config();
-                    // unthrottle stream by default
-                    config.stream_throughput_outbound_megabits_per_sec = 0;
-                    
config.inter_dc_stream_throughput_outbound_megabits_per_sec = 0;
-                }
-
-                if (cmd.hasOption(NATIVE_PORT_OPTION))
-                    nativePort = 
Integer.parseInt(cmd.getOptionValue(NATIVE_PORT_OPTION));
-                else
-                    nativePort = config.native_transport_port;
-                if (cmd.hasOption(STORAGE_PORT_OPTION))
-                    storagePort = 
Integer.parseInt(cmd.getOptionValue(STORAGE_PORT_OPTION));
-                else
-                    storagePort = config.storage_port;
                 if (cmd.hasOption(SSL_STORAGE_PORT_OPTION))
                     sslStoragePort = 
Integer.parseInt(cmd.getOptionValue(SSL_STORAGE_PORT_OPTION));
                 else
@@ -626,7 +617,6 @@ public class LoaderOptions
         options.addOption("st", SSL_STORE_TYPE, "STORE-TYPE", "Client SSL: 
type of store");
         options.addOption("ciphers", SSL_CIPHER_SUITES, "CIPHER-SUITES", 
"Client SSL: comma-separated list of encryption suites to use");
         options.addOption("f", CONFIG_PATH, "path to config file", 
"cassandra.yaml file path for streaming throughput and client/server SSL.");
-        options.addOption("spd", ALLOW_SERVER_PORT_DISCOVERY_OPTION, "allow 
server port discovery", "Use ports published by server to decide how to 
connect. With SSL requires StartTLS to be used.");
         options.addOption("k", TARGET_KEYSPACE, "target keyspace name", 
"target keyspace name");
         return options;
     }
diff --git a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java 
b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
index bb0ee25..9763d7e 100644
--- a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
+++ b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
@@ -46,37 +46,28 @@ public class NativeSSTableLoaderClient extends 
SSTableLoader.Client
 {
     protected final Map<String, TableMetadataRef> tables;
     private final Collection<InetSocketAddress> hosts;
-    private final int port;
     private final int storagePort;
     private final AuthProvider authProvider;
     private final SSLOptions sslOptions;
-    private final boolean allowServerPortDiscovery;
 
-
-    public NativeSSTableLoaderClient(Collection<InetSocketAddress> hosts, int 
nativePort, int storagePort, String username, String password, SSLOptions 
sslOptions, boolean allowServerPortDiscovery)
+    public NativeSSTableLoaderClient(Collection<InetSocketAddress> hosts, int 
storagePort, String username, String password, SSLOptions sslOptions)
     {
-        this(hosts, nativePort, storagePort, new 
PlainTextAuthProvider(username, password), sslOptions, 
allowServerPortDiscovery);
+        this(hosts, storagePort, new PlainTextAuthProvider(username, 
password), sslOptions);
     }
 
-    public NativeSSTableLoaderClient(Collection<InetSocketAddress> hosts, int 
nativePort, int storagePort, AuthProvider authProvider, SSLOptions sslOptions, 
boolean allowServerPortDiscovery)
+    public NativeSSTableLoaderClient(Collection<InetSocketAddress> hosts, int 
storagePort, AuthProvider authProvider, SSLOptions sslOptions)
     {
         super();
         this.tables = new HashMap<>();
         this.hosts = hosts;
-        this.port = nativePort;
         this.authProvider = authProvider;
         this.sslOptions = sslOptions;
-        this.allowServerPortDiscovery = allowServerPortDiscovery;
         this.storagePort = storagePort;
     }
 
     public void init(String keyspace)
     {
-        Set<InetAddress> hostAddresses = hosts.stream().map(host -> 
host.getAddress()).collect(Collectors.toSet());
-        Cluster.Builder builder = 
Cluster.builder().addContactPoints(hostAddresses).withPort(port).allowBetaProtocolVersion();
-
-        if (allowServerPortDiscovery)
-            builder = builder.allowServerPortDiscovery();
+        Cluster.Builder builder = 
Cluster.builder().addContactPointsWithPorts(hosts).allowBetaProtocolVersion();
 
         if (sslOptions != null)
             builder.withSSL(sslOptions);
@@ -100,15 +91,9 @@ public class NativeSSTableLoaderClient extends 
SSTableLoader.Client
                                                  
tokenFactory.fromString(tokenRange.getEnd().getValue().toString()));
                 for (Host endpoint : endpoints)
                 {
-                    int portToUse;
-                    if (allowServerPortDiscovery)
-                    {
-                        portToUse = 
endpoint.getBroadcastAddressOptPort().portOrElse(storagePort);
-                    }
-                    else
-                    {
-                        portToUse = storagePort;
-                    }
+                    int broadcastPort = 
endpoint.getBroadcastSocketAddress().getPort();
+                    // use port from broadcast address if set.
+                    int portToUse = broadcastPort != 0 ? broadcastPort : 
storagePort;
                     addRangeForEndpoint(range, 
InetAddressAndPort.getByNameOverrideDefaults(endpoint.getAddress().getHostAddress(),
 portToUse));
                 }
             }
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java 
b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 9c4f22e..8f3a52a 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -47,6 +47,8 @@ import com.datastax.driver.core.ResultSet;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
+import org.apache.cassandra.db.virtual.VirtualSchemaKeyspace;
 import org.apache.cassandra.index.SecondaryIndexManager;
 import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.locator.InetAddressAndPort;
@@ -401,6 +403,8 @@ public abstract class CQLTester
             return;
 
         SystemKeyspace.finishStartup();
+        
VirtualKeyspaceRegistry.instance.register(VirtualSchemaKeyspace.instance);
+
         StorageService.instance.initServer();
         SchemaLoader.startGossiper();
 
diff --git 
a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java 
b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
index b140813..6bef001 100644
--- a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
@@ -552,7 +552,7 @@ public class PendingAntiCompactionTest extends 
AbstractPendingAntiCompactionTest
                     public void onFailure(Throwable throwable)
                     {
                     }
-                });
+                }, MoreExecutors.directExecutor());
                 assertTrue(cdl.await(1, TimeUnit.MINUTES));
             }
         }
diff --git a/test/unit/org/apache/cassandra/hints/HintsServiceTest.java 
b/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
index 7778331..dddf336 100644
--- a/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
@@ -25,6 +25,7 @@ import javax.annotation.Nullable;
 
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -129,7 +130,7 @@ public class HintsServiceTest
             {
                 HintsService.instance.resumeDispatch();
             }
-        });
+        }, MoreExecutors.directExecutor());
 
         Futures.allAsList(
                 noMessagesWhilePaused,
diff --git a/test/unit/org/apache/cassandra/net/MockMessagingSpy.java 
b/test/unit/org/apache/cassandra/net/MockMessagingSpy.java
index bf4c226..c61c301 100644
--- a/test/unit/org/apache/cassandra/net/MockMessagingSpy.java
+++ b/test/unit/org/apache/cassandra/net/MockMessagingSpy.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.util.concurrent.AbstractFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,7 +54,7 @@ public class MockMessagingSpy
      */
     public ListenableFuture<Message<?>> captureMockedMessage()
     {
-        return Futures.transform(captureMockedMessageN(1), (List<Message<?>> 
result) -> result.isEmpty() ? null : result.get(0));
+        return Futures.transform(captureMockedMessageN(1), (List<Message<?>> 
result) -> result.isEmpty() ? null : result.get(0), 
MoreExecutors.directExecutor());
     }
 
     /**
@@ -89,7 +90,7 @@ public class MockMessagingSpy
      */
     public ListenableFuture<Message<?>> captureMessageOut()
     {
-        return Futures.transform(captureMessageOut(1), (List<Message<?>> 
result) -> result.isEmpty() ? null : result.get(0));
+        return Futures.transform(captureMessageOut(1), (List<Message<?>> 
result) -> result.isEmpty() ? null : result.get(0), 
MoreExecutors.directExecutor());
     }
 
     /**
@@ -231,4 +232,4 @@ public class MockMessagingSpy
             }
         }
     }
-}
+}
\ No newline at end of file
diff --git 
a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java 
b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 909e221..d88f379 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -24,6 +24,8 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+
 import org.apache.cassandra.locator.RangesAtEndpoint;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -130,7 +132,7 @@ public class StreamingTransferTest
             {
                 fail();
             }
-        });
+        }, MoreExecutors.directExecutor());
         // should be complete immediately
         futureResult.get(100, TimeUnit.MILLISECONDS);
     }
diff --git a/test/unit/org/apache/cassandra/tools/BulkLoaderTest.java 
b/test/unit/org/apache/cassandra/tools/BulkLoaderTest.java
index 104f288..6ed38a0 100644
--- a/test/unit/org/apache/cassandra/tools/BulkLoaderTest.java
+++ b/test/unit/org/apache/cassandra/tools/BulkLoaderTest.java
@@ -63,4 +63,76 @@ public class BulkLoaderTest extends ToolsTester
         assertKeyspaceNotLoaded();
         assertServerNotLoaded();
     }
+
+    @Test
+    public void testBulkLoader_WithArgs1() throws Exception
+    {
+        try
+        {
+            runTool(0, "org.apache.cassandra.tools.BulkLoader", "-d", 
"127.9.9.1", "--port", "9042", sstableDirName("legacy_sstables", 
"legacy_ma_simple"));
+            fail();
+        }
+        catch (RuntimeException e)
+        {
+            if (!(e.getCause() instanceof BulkLoadException))
+                throw e;
+            if (!(e.getCause().getCause() instanceof NoHostAvailableException))
+                throw e;
+        }
+        assertNoUnexpectedThreadsStarted(null, new 
String[]{"globalEventExecutor-1-1", "globalEventExecutor-1-2"});
+        assertSchemaNotLoaded();
+        assertCLSMNotLoaded();
+        assertSystemKSNotLoaded();
+        assertKeyspaceNotLoaded();
+        assertServerNotLoaded();
+    }
+
+    @Test
+    public void testBulkLoader_WithArgs2() throws Exception
+    {
+        try
+        {
+            runTool(0, "org.apache.cassandra.tools.BulkLoader", "-d", 
"127.9.9.1:9042", "--port", "9041", sstableDirName("legacy_sstables", 
"legacy_ma_simple"));
+            fail();
+        }
+        catch (RuntimeException e)
+        {
+            if (!(e.getCause() instanceof BulkLoadException))
+                throw e;
+            if (!(e.getCause().getCause() instanceof NoHostAvailableException))
+                throw e;
+        }
+        assertNoUnexpectedThreadsStarted(null, new 
String[]{"globalEventExecutor-1-1", "globalEventExecutor-1-2"});
+        assertSchemaNotLoaded();
+        assertCLSMNotLoaded();
+        assertSystemKSNotLoaded();
+        assertKeyspaceNotLoaded();
+        assertServerNotLoaded();
+    }
+
+    @Test(expected = NoHostAvailableException.class)
+    public void testBulkLoader_WithArgs3() throws Throwable
+    {
+        try
+        {
+            runTool(1, "org.apache.cassandra.tools.BulkLoader", "-d", 
"127.9.9.1", "--port", "9041", sstableDirName("legacy_sstables", 
"legacy_ma_simple"));
+        }
+        catch (RuntimeException e)
+        {
+            throw e.getCause().getCause();
+        }
+    }
+
+    @Test(expected = NoHostAvailableException.class)
+    public void testBulkLoader_WithArgs4() throws Throwable
+    {
+        try
+        {
+            runTool(1, "org.apache.cassandra.tools.BulkLoader", "-d", 
"127.9.9.1:9041", sstableDirName("legacy_sstables", "legacy_ma_simple"));
+        }
+        catch (RuntimeException e)
+        {
+            throw e.getCause().getCause();
+        }
+    }
 }
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java 
b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java
index 24c10bf..8a484c7 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java
@@ -39,7 +39,6 @@ public class SettingsNode implements Serializable
     public final List<String> nodes;
     public final boolean isWhiteList;
     public final String datacenter;
-    public final boolean allowServerPortDiscovery;
 
     public SettingsNode(Options options)
     {
@@ -72,7 +71,6 @@ public class SettingsNode implements Serializable
 
         isWhiteList = options.whitelist.setByUser();
         datacenter = options.datacenter.value();
-        allowServerPortDiscovery = 
options.allowServerPortDiscovery.setByUser();
     }
 
     public Set<String> resolveAllPermitted(StressSettings settings)
@@ -145,13 +143,12 @@ public class SettingsNode implements Serializable
         final OptionSimple datacenter = new OptionSimple("datacenter=", ".*", 
null, "Datacenter used for DCAwareRoundRobinLoadPolicy", false);
         final OptionSimple whitelist = new OptionSimple("whitelist", "", null, 
"Limit communications to the provided nodes", false);
         final OptionSimple file = new OptionSimple("file=", ".*", null, "Node 
file (one per line)", false);
-        final OptionSimple allowServerPortDiscovery = new 
OptionSimple("allow_server_port_discovery", "", null, "Allow Java client to 
discover server client port numbers", false);
         final OptionSimple list = new OptionSimple("", "[^=,]+(,[^=,]+)*", 
"localhost", "comma delimited list of nodes", false);
 
         @Override
         public List<? extends Option> options()
         {
-            return Arrays.asList(datacenter, whitelist, file, 
allowServerPortDiscovery, list);
+            return Arrays.asList(datacenter, whitelist, file, list);
         }
     }
 
@@ -161,7 +158,6 @@ public class SettingsNode implements Serializable
         out.println("  Nodes: " + nodes);
         out.println("  Is White List: " + isWhiteList);
         out.println("  Datacenter: " + datacenter);
-        out.println("  Allow server port discovery: " + 
allowServerPortDiscovery);
     }
 
     public static SettingsNode get(Map<String, String[]> clArgs)
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java 
b/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
index 36361f7..72487dc 100644
--- a/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
+++ b/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
@@ -54,7 +54,6 @@ public class JavaDriverClient
     private Cluster cluster;
     private Session session;
     private final LoadBalancingPolicy loadBalancingPolicy;
-    private final boolean allowServerPortDiscovery;
 
     private static final ConcurrentMap<String, PreparedStatement> stmts = new 
ConcurrentHashMap<>();
 
@@ -74,7 +73,6 @@ public class JavaDriverClient
         this.encryptionOptions = encryptionOptions;
         this.loadBalancingPolicy = loadBalancingPolicy(settings);
         this.connectionsPerHost = settings.mode.connectionsPerHost == null ? 8 
: settings.mode.connectionsPerHost;
-        this.allowServerPortDiscovery = settings.node.allowServerPortDiscovery;
 
         int maxThreadCount = 0;
         if (settings.rate.auto)
@@ -136,8 +134,6 @@ public class JavaDriverClient
                                                 .withoutJMXReporting()
                                                 
.withProtocolVersion(protocolVersion)
                                                 .withoutMetrics(); // The 
driver uses metrics 3 with conflict with our version
-        if (allowServerPortDiscovery)
-            clusterBuilder = clusterBuilder.allowServerPortDiscovery();
 
         if (loadBalancingPolicy != null)
             clusterBuilder.withLoadBalancingPolicy(loadBalancingPolicy);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to