This is an automated email from the ASF dual-hosted git repository.
mck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 38b4990 Upgrading Guava to 27, and to java-driver 3.6.0 (from
3.4.0-SNAPSHOT), plus refactoring to remove nativePort argument for
NativeSSTableLoaderClient constructor
38b4990 is described below
commit 38b49904dd1c71fcb16abfbc205edfd6ce008b76
Author: Sumanth Pasupuleti <[email protected]>
AuthorDate: Tue Apr 10 15:01:48 2018 -0700
Upgrading Guava to 27, and to java-driver 3.6.0 (from 3.4.0-SNAPSHOT), plus
refactoring to remove nativePort argument for NativeSSTableLoaderClient
constructor
Patch by Sumanth Pasupuleti; reviewed by Michael Semb Wever for
CASSANDRA-14655
---
CHANGES.txt | 1 +
build.xml | 12 +---
...cassandra-driver-core-3.4.0-SNAPSHOT-shaded.jar | Bin 2624086 -> 0 bytes
lib/cassandra-driver-core-3.6.0-shaded.jar | Bin 0 -> 2725381 bytes
lib/guava-23.3-jre.jar | Bin 2655564 -> 0 bytes
lib/guava-27.0-jre.jar | Bin 0 -> 2747878 bytes
.../apache/cassandra/audit/AuditLogManager.java | 1 -
.../cassandra/hadoop/cql3/CqlBulkRecordWriter.java | 4 +-
.../cassandra/hadoop/cql3/CqlConfigHelper.java | 12 ----
.../cassandra/index/SecondaryIndexManager.java | 6 +-
.../cassandra/io/sstable/format/SSTableFormat.java | 2 +-
.../org/apache/cassandra/repair/RepairJob.java | 4 +-
.../apache/cassandra/repair/RepairRunnable.java | 6 +-
.../org/apache/cassandra/repair/RepairSession.java | 2 +-
.../repair/consistent/CoordinatorSession.java | 2 +-
.../cassandra/repair/consistent/LocalSessions.java | 3 +-
.../apache/cassandra/service/StorageService.java | 6 +-
.../cassandra/streaming/StreamResultFuture.java | 3 +-
.../org/apache/cassandra/tools/BulkLoader.java | 10 +--
.../org/apache/cassandra/tools/LoaderOptions.java | 72 +++++++++------------
.../cassandra/utils/NativeSSTableLoaderClient.java | 29 ++-------
test/unit/org/apache/cassandra/cql3/CQLTester.java | 4 ++
.../db/repair/PendingAntiCompactionTest.java | 2 +-
.../apache/cassandra/hints/HintsServiceTest.java | 3 +-
.../org/apache/cassandra/net/MockMessagingSpy.java | 7 +-
.../cassandra/streaming/StreamingTransferTest.java | 4 +-
.../org/apache/cassandra/tools/BulkLoaderTest.java | 72 +++++++++++++++++++++
.../cassandra/stress/settings/SettingsNode.java | 6 +-
.../cassandra/stress/util/JavaDriverClient.java | 4 --
29 files changed, 150 insertions(+), 127 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 1ecda8b..a72386a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-alpha2
+ * Upgrade Guava to 27, and to java-driver 3.6.0 (from 3.4.0-SNAPSHOT)
(CASSANDRA-14655)
* Extract an AbstractCompactionController to allow for custom implementations
(CASSANDRA-15286)
* Move chronicle-core version from snapshot to stable, and include
carrotsearch in generated pom.xml (CASSANDRA-15321)
* Untangle RepairMessage sub-hierarchy of messages, use new messaging (more)
correctly (CASSANDRA-15163)
diff --git a/build.xml b/build.xml
index 3eb074f..73539df 100644
--- a/build.xml
+++ b/build.xml
@@ -487,7 +487,7 @@
<dependency groupId="org.lz4" artifactId="lz4-java" version="1.4.0"/>
<dependency groupId="com.ning" artifactId="compress-lzf"
version="0.8.4"/>
<dependency groupId="com.github.luben" artifactId="zstd-jni"
version="1.3.8-5"/>
- <dependency groupId="com.google.guava" artifactId="guava"
version="23.3-jre"/>
+ <dependency groupId="com.google.guava" artifactId="guava"
version="27.0-jre"/>
<dependency groupId="org.hdrhistogram" artifactId="HdrHistogram"
version="2.1.9"/>
<dependency groupId="commons-cli" artifactId="commons-cli"
version="1.1"/>
<dependency groupId="commons-codec" artifactId="commons-codec"
version="1.9"/>
@@ -557,15 +557,13 @@
<dependency groupId="net.openhft" artifactId="chronicle-threads"
version="${chronicle-threads.version}"/>
<dependency groupId="com.google.code.findbugs" artifactId="jsr305"
version="2.0.2" />
<dependency groupId="com.clearspring.analytics" artifactId="stream"
version="2.5.2" />
- <!-- UPDATE AND UNCOMMENT ON THE DRIVER RELEASE, BEFORE 4.0 RELEASE
- <dependency groupId="com.datastax.cassandra"
artifactId="cassandra-driver-core" version="3.4.0-SNAPSHOT" classifier="shaded">
+ <dependency groupId="com.datastax.cassandra"
artifactId="cassandra-driver-core" version="3.6.0" classifier="shaded">
<exclusion groupId="io.netty" artifactId="netty-buffer"/>
<exclusion groupId="io.netty" artifactId="netty-codec"/>
<exclusion groupId="io.netty" artifactId="netty-handler"/>
<exclusion groupId="io.netty" artifactId="netty-transport"/>
<exclusion groupId="org.slf4j" artifactId="slf4j-api"/>
</dependency>
- -->
<dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"
version="${ecj.version}" />
<dependency groupId="org.caffinitas.ohc" artifactId="ohc-core"
version="${ohc.version}">
<exclusion groupId="org.slf4j" artifactId="slf4j-api"/>
@@ -647,9 +645,7 @@
<dependency groupId="org.apache.hadoop"
artifactId="hadoop-minicluster"/>
<dependency groupId="com.google.code.findbugs" artifactId="jsr305"/>
<dependency groupId="org.antlr" artifactId="antlr"/>
- <!-- UPDATE AND UNCOMMENT ON THE DRIVER RELEASE, BEFORE 4.0 RELEASE
<dependency groupId="com.datastax.cassandra"
artifactId="cassandra-driver-core" classifier="shaded"/>
- -->
<dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"/>
<dependency groupId="org.caffinitas.ohc" artifactId="ohc-core"/>
<dependency groupId="org.caffinitas.ohc" artifactId="ohc-core-j8"/>
@@ -666,9 +662,7 @@
artifactId="cassandra-parent"
version="${version}"/>
<dependency groupId="junit" artifactId="junit"/>
- <!-- UPDATE AND UNCOMMENT ON THE DRIVER RELEASE, BEFORE 4.0 RELEASE
<dependency groupId="com.datastax.cassandra"
artifactId="cassandra-driver-core" classifier="shaded"/>
- -->
<dependency groupId="io.netty" artifactId="netty-all"/>
<dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"/>
<dependency groupId="org.caffinitas.ohc" artifactId="ohc-core"/>
@@ -743,14 +737,12 @@
<dependency groupId="org.apache.hadoop"
artifactId="hadoop-minicluster" optional="true"/>
<!-- don't need the Java Driver to run, but if you use the hadoop
stuff or UDFs -->
- <!-- UPDATE AND UNCOMMENT ON THE DRIVER RELEASE, BEFORE 4.0 RELEASE
<dependency groupId="com.datastax.cassandra"
artifactId="cassandra-driver-core" classifier="shaded" optional="true">
<exclusion groupId="io.netty" artifactId="netty-buffer"/>
<exclusion groupId="io.netty" artifactId="netty-codec"/>
<exclusion groupId="io.netty" artifactId="netty-handler"/>
<exclusion groupId="io.netty" artifactId="netty-transport"/>
</dependency>
- -->
<!-- don't need jna to run, but nice to have -->
<dependency groupId="net.java.dev.jna" artifactId="jna"/>
diff --git a/lib/cassandra-driver-core-3.4.0-SNAPSHOT-shaded.jar
b/lib/cassandra-driver-core-3.4.0-SNAPSHOT-shaded.jar
deleted file mode 100644
index 1290dc3..0000000
Binary files a/lib/cassandra-driver-core-3.4.0-SNAPSHOT-shaded.jar and
/dev/null differ
diff --git a/lib/cassandra-driver-core-3.6.0-shaded.jar
b/lib/cassandra-driver-core-3.6.0-shaded.jar
new file mode 100644
index 0000000..ea06b02
Binary files /dev/null and b/lib/cassandra-driver-core-3.6.0-shaded.jar differ
diff --git a/lib/guava-23.3-jre.jar b/lib/guava-23.3-jre.jar
deleted file mode 100644
index b13e275..0000000
Binary files a/lib/guava-23.3-jre.jar and /dev/null differ
diff --git a/lib/guava-27.0-jre.jar b/lib/guava-27.0-jre.jar
new file mode 100644
index 0000000..1bdd007
Binary files /dev/null and b/lib/guava-27.0-jre.jar differ
diff --git a/src/java/org/apache/cassandra/audit/AuditLogManager.java
b/src/java/org/apache/cassandra/audit/AuditLogManager.java
index d11eaa0..bc2126c 100644
--- a/src/java/org/apache/cassandra/audit/AuditLogManager.java
+++ b/src/java/org/apache/cassandra/audit/AuditLogManager.java
@@ -37,7 +37,6 @@ import org.apache.cassandra.cql3.statements.BatchStatement;
import org.apache.cassandra.exceptions.AuthenticationException;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.UnauthorizedException;
-import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.utils.FBUtilities;
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
index 4bbb861..77ad95f 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
@@ -287,12 +287,10 @@ public class CqlBulkRecordWriter extends
RecordWriter<Object, List<ByteBuffer>>
public ExternalClient(Configuration conf)
{
super(resolveHostAddresses(conf),
- CqlConfigHelper.getOutputNativePort(conf),
ConfigHelper.getOutputInitialPort(conf),
ConfigHelper.getOutputKeyspaceUserName(conf),
ConfigHelper.getOutputKeyspacePassword(conf),
- CqlConfigHelper.getSSLOptions(conf).orNull(),
- CqlConfigHelper.getAllowServerPortDiscovery(conf));
+ CqlConfigHelper.getSSLOptions(conf).orNull());
}
private static Collection<InetSocketAddress>
resolveHostAddresses(Configuration conf)
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index 3a47a72..f9a6f3a 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -88,7 +88,6 @@ public class CqlConfigHelper
private static final String OUTPUT_CQL = "cassandra.output.cql";
private static final String OUTPUT_NATIVE_PORT =
"cassandra.output.native.port";
- private static final String ALLOW_SERVER_PORT_DISCOVERY =
"cassandra.allowserverportdiscovery";
/**
* Set the CQL columns for the input of this job.
@@ -652,15 +651,4 @@ public class CqlConfigHelper
new SecureRandom());
return ctx;
}
-
- public static void setAllowServerPortDiscovery(Configuration conf, boolean
allowServerPortDiscovery)
- {
- conf.set(ALLOW_SERVER_PORT_DISCOVERY,
Boolean.toString(allowServerPortDiscovery));
- }
-
- public static boolean getAllowServerPortDiscovery(Configuration conf)
- {
- return Boolean.parseBoolean(conf.get(ALLOW_SERVER_PORT_DISCOVERY,
"false"));
- }
-
}
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index 60fc3ba..336e2ea 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -487,7 +487,7 @@ public class SecondaryIndexManager implements
IndexRegistry, INotificationConsum
builtIndexes.addAll(groupedIndexes);
build.set(o);
}
- });
+ }, MoreExecutors.directExecutor());
futures.add(build);
});
@@ -1444,7 +1444,7 @@ public class SecondaryIndexManager implements
IndexRegistry, INotificationConsum
if (null != task)
{
ListenableFuture<?> f = blockingExecutor.submit(task);
- if (callback != null) Futures.addCallback(f, callback);
+ if (callback != null) Futures.addCallback(f, callback,
MoreExecutors.directExecutor());
FBUtilities.waitOnFuture(f);
}
}
@@ -1464,7 +1464,7 @@ public class SecondaryIndexManager implements
IndexRegistry, INotificationConsum
if (null != task)
{
ListenableFuture<?> f =
blockingExecutor.submit(task);
- if (callback != null) Futures.addCallback(f,
callback);
+ if (callback != null) Futures.addCallback(f,
callback, MoreExecutors.directExecutor());
waitFor.add(f);
}
});
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
b/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
index 38a7f57..14f6602 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
@@ -57,7 +57,7 @@ public interface SSTableFormat
{
//Since format comes right after generation
//we disallow formats with numeric names
- assert !CharMatcher.DIGIT.matchesAllOf(name);
+ assert !CharMatcher.digit().matchesAllOf(name);
this.name = name;
this.info = info;
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java
b/src/java/org/apache/cassandra/repair/RepairJob.java
index f682bfb..3740070 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -391,7 +391,7 @@ public class RepairJob extends AbstractFuture<RepairResult>
implements Runnable
// failure is handled at root of job chain
public void onFailure(Throwable t) {}
- });
+ }, MoreExecutors.directExecutor());
currentTask = nextTask;
}
// start running tasks
@@ -448,7 +448,7 @@ public class RepairJob extends AbstractFuture<RepairResult>
implements Runnable
// failure is handled at root of job chain
public void onFailure(Throwable t) {}
- });
+ }, MoreExecutors.directExecutor());
currentTask = nextTask;
}
// start running tasks
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java
b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index 7e931e8..f1f12dd 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -388,7 +388,7 @@ public class RepairRunnable extends WrappedRunnable
implements ProgressEventNoti
{
ranges.addAll(range);
}
- Futures.addCallback(repairResult, new
RepairCompleteCallback(parentSession, ranges, startTime, traceState,
hasFailure, executor));
+ Futures.addCallback(repairResult, new
RepairCompleteCallback(parentSession, ranges, startTime, traceState,
hasFailure, executor), MoreExecutors.directExecutor());
}
private void previewRepair(UUID parentSession,
@@ -463,7 +463,7 @@ public class RepairRunnable extends WrappedRunnable
implements ProgressEventNoti
executor.shutdownNow();
return message;
}
- });
+ }, MoreExecutors.directExecutor());
}
private ListenableFuture<List<RepairSessionResult>>
submitRepairSessions(UUID parentSession,
@@ -493,7 +493,7 @@ public class RepairRunnable extends WrappedRunnable
implements ProgressEventNoti
cfnames);
if (session == null)
continue;
- Futures.addCallback(session, new RepairSessionCallback(session));
+ Futures.addCallback(session, new RepairSessionCallback(session),
MoreExecutors.directExecutor());
futures.add(session);
}
return Futures.successfulAsList(futures);
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java
b/src/java/org/apache/cassandra/repair/RepairSession.java
index 40f3dbe..3483e59 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -344,7 +344,7 @@ public class RepairSession extends
AbstractFuture<RepairSessionResult> implement
Tracing.traceRepair("Session completed with the following
error: {}", t);
forceShutdown(t);
}
- });
+ }, MoreExecutors.directExecutor());
}
public void terminate()
diff --git
a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
index 8f1759a..39549bd 100644
--- a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
+++ b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
@@ -382,7 +382,7 @@ public class CoordinatorSession extends ConsistentSession
resultFuture.setException(t);
}
}
- });
+ }, MoreExecutors.directExecutor());
return resultFuture;
}
diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
index 935bba8..6475794 100644
--- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
@@ -48,6 +48,7 @@ import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.slf4j.Logger;
@@ -666,7 +667,7 @@ public class LocalSessions
executor.shutdown();
}
}
- });
+ }, MoreExecutors.directExecutor());
}
public void maybeSetRepairing(UUID sessionID)
diff --git a/src/java/org/apache/cassandra/service/StorageService.java
b/src/java/org/apache/cassandra/service/StorageService.java
index a7c125c..777e40e 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1584,7 +1584,7 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
{
logger.warn("Error during bootstrap.", e);
}
- });
+ }, MoreExecutors.directExecutor());
try
{
bootstrapStream.get();
@@ -1679,7 +1679,7 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
progressSupport.progress("bootstrap", new
ProgressEvent(ProgressEventType.ERROR, 1, 1, message));
progressSupport.progress("bootstrap", new
ProgressEvent(ProgressEventType.COMPLETE, 1, 1, "Resume bootstrap complete"));
}
- });
+ }, MoreExecutors.directExecutor());
return true;
}
else
@@ -2958,7 +2958,7 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
// We still want to send the notification
sendReplicationNotification(notifyEndpoint);
}
- });
+ }, MoreExecutors.directExecutor());
}
/**
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index 4de63be..3268ecf 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -145,7 +146,7 @@ public final class StreamResultFuture extends
AbstractFuture<StreamState>
public void addEventListener(StreamEventHandler listener)
{
- Futures.addCallback(this, listener);
+ Futures.addCallback(this, listener, MoreExecutors.directExecutor());
eventListeners.add(listener);
}
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java
b/src/java/org/apache/cassandra/tools/BulkLoader.java
index 2ca2a3d..42c2bf3 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -57,13 +57,11 @@ public class BulkLoader
options.directory.getAbsoluteFile(),
new ExternalClient(
options.hosts,
- options.nativePort,
options.storagePort,
options.authProvider,
options.sslStoragePort,
options.serverEncOptions,
- buildSSLOptions(options.clientEncOptions),
- options.allowServerPortDiscovery),
+ buildSSLOptions(options.clientEncOptions)),
handler,
options.connectionsPerHost,
options.targetKeyspace);
@@ -275,15 +273,13 @@ public class BulkLoader
private final EncryptionOptions.ServerEncryptionOptions
serverEncOptions;
public ExternalClient(Set<InetSocketAddress> hosts,
- int nativePort,
int storagePort,
AuthProvider authProvider,
int sslStoragePort,
EncryptionOptions.ServerEncryptionOptions
serverEncryptionOptions,
- SSLOptions sslOptions,
- boolean allowServerPortDiscovery)
+ SSLOptions sslOptions)
{
- super(hosts, nativePort, storagePort, authProvider, sslOptions,
allowServerPortDiscovery);
+ super(hosts, storagePort, authProvider, sslOptions);
this.sslStoragePort = sslStoragePort;
serverEncOptions = serverEncryptionOptions;
}
diff --git a/src/java/org/apache/cassandra/tools/LoaderOptions.java
b/src/java/org/apache/cassandra/tools/LoaderOptions.java
index 7ad3299..9acf8a5 100644
--- a/src/java/org/apache/cassandra/tools/LoaderOptions.java
+++ b/src/java/org/apache/cassandra/tools/LoaderOptions.java
@@ -59,7 +59,6 @@ public class LoaderOptions
public static final String THROTTLE_MBITS = "throttle";
public static final String INTER_DC_THROTTLE_MBITS = "inter-dc-throttle";
public static final String TOOL_NAME = "sstableloader";
- public static final String ALLOW_SERVER_PORT_DISCOVERY_OPTION =
"server-port-discovery";
public static final String TARGET_KEYSPACE = "target-keyspace";
/* client encryption options */
@@ -89,7 +88,6 @@ public class LoaderOptions
public final EncryptionOptions.ServerEncryptionOptions serverEncOptions;
public final Set<InetSocketAddress> hosts;
public final Set<InetAddressAndPort> ignores;
- public final boolean allowServerPortDiscovery;
public final String targetKeyspace;
LoaderOptions(Builder builder)
@@ -110,7 +108,6 @@ public class LoaderOptions
connectionsPerHost = builder.connectionsPerHost;
serverEncOptions = builder.serverEncOptions;
hosts = builder.hosts;
- allowServerPortDiscovery = builder.allowServerPortDiscovery;
ignores = builder.ignores;
targetKeyspace = builder.targetKeyspace;
}
@@ -137,7 +134,6 @@ public class LoaderOptions
Set<InetAddress> ignoresArg = new HashSet<>();
Set<InetSocketAddress> hosts = new HashSet<>();
Set<InetAddressAndPort> ignores = new HashSet<>();
- boolean allowServerPortDiscovery;
String targetKeyspace;
Builder()
@@ -307,12 +303,6 @@ public class LoaderOptions
return this;
}
- public Builder allowServerPortDiscovery(boolean
allowServerPortDiscovery)
- {
- this.allowServerPortDiscovery = allowServerPortDiscovery;
- return this;
- }
-
public Builder parseArgs(String cmdArgs[])
{
CommandLineParser parser = new GnuParser();
@@ -359,7 +349,6 @@ public class LoaderOptions
verbose = cmd.hasOption(VERBOSE_OPTION);
noProgress = cmd.hasOption(NOPROGRESS_OPTION);
- allowServerPortDiscovery =
cmd.hasOption(ALLOW_SERVER_PORT_DISCOVERY_OPTION);
if (cmd.hasOption(USER_OPTION))
{
@@ -376,6 +365,32 @@ public class LoaderOptions
authProviderName =
cmd.getOptionValue(AUTH_PROVIDER_OPTION);
}
+ // try to load config file first, so that values can be
+ // rewritten with other option values.
+ // otherwise use default config.
+ Config config;
+ if (cmd.hasOption(CONFIG_PATH))
+ {
+ File configFile = new
File(cmd.getOptionValue(CONFIG_PATH));
+ if (!configFile.exists())
+ {
+ errorMsg("Config file not found", options);
+ }
+ config = new
YamlConfigurationLoader().loadConfig(configFile.toURI().toURL());
+ }
+ else
+ {
+ config = new Config();
+ // unthrottle stream by default
+ config.stream_throughput_outbound_megabits_per_sec = 0;
+
config.inter_dc_stream_throughput_outbound_megabits_per_sec = 0;
+ }
+
+ if (cmd.hasOption(NATIVE_PORT_OPTION))
+ nativePort =
Integer.parseInt(cmd.getOptionValue(NATIVE_PORT_OPTION));
+ else
+ nativePort = config.native_transport_port;
+
if (cmd.hasOption(INITIAL_HOST_ADDRESS_OPTION))
{
String[] nodes =
cmd.getOptionValue(INITIAL_HOST_ADDRESS_OPTION).split(",");
@@ -398,6 +413,11 @@ public class LoaderOptions
System.exit(1);
}
+ if (cmd.hasOption(STORAGE_PORT_OPTION))
+ storagePort =
Integer.parseInt(cmd.getOptionValue(STORAGE_PORT_OPTION));
+ else
+ storagePort = config.storage_port;
+
if (cmd.hasOption(IGNORE_NODES_OPTION))
{
String[] nodes =
cmd.getOptionValue(IGNORE_NODES_OPTION).split(",");
@@ -418,35 +438,6 @@ public class LoaderOptions
connectionsPerHost =
Integer.parseInt(cmd.getOptionValue(CONNECTIONS_PER_HOST));
}
- // try to load config file first, so that values can be
- // rewritten with other option values.
- // otherwise use default config.
- Config config;
- if (cmd.hasOption(CONFIG_PATH))
- {
- File configFile = new
File(cmd.getOptionValue(CONFIG_PATH));
- if (!configFile.exists())
- {
- errorMsg("Config file not found", options);
- }
- config = new
YamlConfigurationLoader().loadConfig(configFile.toURI().toURL());
- }
- else
- {
- config = new Config();
- // unthrottle stream by default
- config.stream_throughput_outbound_megabits_per_sec = 0;
-
config.inter_dc_stream_throughput_outbound_megabits_per_sec = 0;
- }
-
- if (cmd.hasOption(NATIVE_PORT_OPTION))
- nativePort =
Integer.parseInt(cmd.getOptionValue(NATIVE_PORT_OPTION));
- else
- nativePort = config.native_transport_port;
- if (cmd.hasOption(STORAGE_PORT_OPTION))
- storagePort =
Integer.parseInt(cmd.getOptionValue(STORAGE_PORT_OPTION));
- else
- storagePort = config.storage_port;
if (cmd.hasOption(SSL_STORAGE_PORT_OPTION))
sslStoragePort =
Integer.parseInt(cmd.getOptionValue(SSL_STORAGE_PORT_OPTION));
else
@@ -626,7 +617,6 @@ public class LoaderOptions
options.addOption("st", SSL_STORE_TYPE, "STORE-TYPE", "Client SSL:
type of store");
options.addOption("ciphers", SSL_CIPHER_SUITES, "CIPHER-SUITES",
"Client SSL: comma-separated list of encryption suites to use");
options.addOption("f", CONFIG_PATH, "path to config file",
"cassandra.yaml file path for streaming throughput and client/server SSL.");
- options.addOption("spd", ALLOW_SERVER_PORT_DISCOVERY_OPTION, "allow
server port discovery", "Use ports published by server to decide how to
connect. With SSL requires StartTLS to be used.");
options.addOption("k", TARGET_KEYSPACE, "target keyspace name",
"target keyspace name");
return options;
}
diff --git a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
index bb0ee25..9763d7e 100644
--- a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
+++ b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
@@ -46,37 +46,28 @@ public class NativeSSTableLoaderClient extends
SSTableLoader.Client
{
protected final Map<String, TableMetadataRef> tables;
private final Collection<InetSocketAddress> hosts;
- private final int port;
private final int storagePort;
private final AuthProvider authProvider;
private final SSLOptions sslOptions;
- private final boolean allowServerPortDiscovery;
-
- public NativeSSTableLoaderClient(Collection<InetSocketAddress> hosts, int
nativePort, int storagePort, String username, String password, SSLOptions
sslOptions, boolean allowServerPortDiscovery)
+ public NativeSSTableLoaderClient(Collection<InetSocketAddress> hosts, int
storagePort, String username, String password, SSLOptions sslOptions)
{
- this(hosts, nativePort, storagePort, new
PlainTextAuthProvider(username, password), sslOptions,
allowServerPortDiscovery);
+ this(hosts, storagePort, new PlainTextAuthProvider(username,
password), sslOptions);
}
- public NativeSSTableLoaderClient(Collection<InetSocketAddress> hosts, int
nativePort, int storagePort, AuthProvider authProvider, SSLOptions sslOptions,
boolean allowServerPortDiscovery)
+ public NativeSSTableLoaderClient(Collection<InetSocketAddress> hosts, int
storagePort, AuthProvider authProvider, SSLOptions sslOptions)
{
super();
this.tables = new HashMap<>();
this.hosts = hosts;
- this.port = nativePort;
this.authProvider = authProvider;
this.sslOptions = sslOptions;
- this.allowServerPortDiscovery = allowServerPortDiscovery;
this.storagePort = storagePort;
}
public void init(String keyspace)
{
- Set<InetAddress> hostAddresses = hosts.stream().map(host ->
host.getAddress()).collect(Collectors.toSet());
- Cluster.Builder builder =
Cluster.builder().addContactPoints(hostAddresses).withPort(port).allowBetaProtocolVersion();
-
- if (allowServerPortDiscovery)
- builder = builder.allowServerPortDiscovery();
+ Cluster.Builder builder =
Cluster.builder().addContactPointsWithPorts(hosts).allowBetaProtocolVersion();
if (sslOptions != null)
builder.withSSL(sslOptions);
@@ -100,15 +91,9 @@ public class NativeSSTableLoaderClient extends
SSTableLoader.Client
tokenFactory.fromString(tokenRange.getEnd().getValue().toString()));
for (Host endpoint : endpoints)
{
- int portToUse;
- if (allowServerPortDiscovery)
- {
- portToUse =
endpoint.getBroadcastAddressOptPort().portOrElse(storagePort);
- }
- else
- {
- portToUse = storagePort;
- }
+ int broadcastPort =
endpoint.getBroadcastSocketAddress().getPort();
+ // use port from broadcast address if set.
+ int portToUse = broadcastPort != 0 ? broadcastPort :
storagePort;
addRangeForEndpoint(range,
InetAddressAndPort.getByNameOverrideDefaults(endpoint.getAddress().getHostAddress(),
portToUse));
}
}
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java
b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 9c4f22e..8f3a52a 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -47,6 +47,8 @@ import com.datastax.driver.core.ResultSet;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
+import org.apache.cassandra.db.virtual.VirtualSchemaKeyspace;
import org.apache.cassandra.index.SecondaryIndexManager;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.locator.InetAddressAndPort;
@@ -401,6 +403,8 @@ public abstract class CQLTester
return;
SystemKeyspace.finishStartup();
+
VirtualKeyspaceRegistry.instance.register(VirtualSchemaKeyspace.instance);
+
StorageService.instance.initServer();
SchemaLoader.startGossiper();
diff --git
a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
index b140813..6bef001 100644
--- a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
@@ -552,7 +552,7 @@ public class PendingAntiCompactionTest extends
AbstractPendingAntiCompactionTest
public void onFailure(Throwable throwable)
{
}
- });
+ }, MoreExecutors.directExecutor());
assertTrue(cdl.await(1, TimeUnit.MINUTES));
}
}
diff --git a/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
b/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
index 7778331..dddf336 100644
--- a/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
@@ -25,6 +25,7 @@ import javax.annotation.Nullable;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -129,7 +130,7 @@ public class HintsServiceTest
{
HintsService.instance.resumeDispatch();
}
- });
+ }, MoreExecutors.directExecutor());
Futures.allAsList(
noMessagesWhilePaused,
diff --git a/test/unit/org/apache/cassandra/net/MockMessagingSpy.java
b/test/unit/org/apache/cassandra/net/MockMessagingSpy.java
index bf4c226..c61c301 100644
--- a/test/unit/org/apache/cassandra/net/MockMessagingSpy.java
+++ b/test/unit/org/apache/cassandra/net/MockMessagingSpy.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,7 +54,7 @@ public class MockMessagingSpy
*/
public ListenableFuture<Message<?>> captureMockedMessage()
{
- return Futures.transform(captureMockedMessageN(1), (List<Message<?>>
result) -> result.isEmpty() ? null : result.get(0));
+ return Futures.transform(captureMockedMessageN(1), (List<Message<?>>
result) -> result.isEmpty() ? null : result.get(0),
MoreExecutors.directExecutor());
}
/**
@@ -89,7 +90,7 @@ public class MockMessagingSpy
*/
public ListenableFuture<Message<?>> captureMessageOut()
{
- return Futures.transform(captureMessageOut(1), (List<Message<?>>
result) -> result.isEmpty() ? null : result.get(0));
+ return Futures.transform(captureMessageOut(1), (List<Message<?>>
result) -> result.isEmpty() ? null : result.get(0),
MoreExecutors.directExecutor());
}
/**
@@ -231,4 +232,4 @@ public class MockMessagingSpy
}
}
}
-}
+}
\ No newline at end of file
diff --git
a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 909e221..d88f379 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -24,6 +24,8 @@ import java.util.concurrent.TimeUnit;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -130,7 +132,7 @@ public class StreamingTransferTest
{
fail();
}
- });
+ }, MoreExecutors.directExecutor());
// should be complete immediately
futureResult.get(100, TimeUnit.MILLISECONDS);
}
diff --git a/test/unit/org/apache/cassandra/tools/BulkLoaderTest.java
b/test/unit/org/apache/cassandra/tools/BulkLoaderTest.java
index 104f288..6ed38a0 100644
--- a/test/unit/org/apache/cassandra/tools/BulkLoaderTest.java
+++ b/test/unit/org/apache/cassandra/tools/BulkLoaderTest.java
@@ -63,4 +63,76 @@ public class BulkLoaderTest extends ToolsTester
assertKeyspaceNotLoaded();
assertServerNotLoaded();
}
+
+ @Test
+ public void testBulkLoader_WithArgs1() throws Exception
+ {
+ try
+ {
+ runTool(0, "org.apache.cassandra.tools.BulkLoader", "-d",
"127.9.9.1", "--port", "9042", sstableDirName("legacy_sstables",
"legacy_ma_simple"));
+ fail();
+ }
+ catch (RuntimeException e)
+ {
+ if (!(e.getCause() instanceof BulkLoadException))
+ throw e;
+ if (!(e.getCause().getCause() instanceof NoHostAvailableException))
+ throw e;
+ }
+ assertNoUnexpectedThreadsStarted(null, new
String[]{"globalEventExecutor-1-1", "globalEventExecutor-1-2"});
+ assertSchemaNotLoaded();
+ assertCLSMNotLoaded();
+ assertSystemKSNotLoaded();
+ assertKeyspaceNotLoaded();
+ assertServerNotLoaded();
+ }
+
+ @Test
+ public void testBulkLoader_WithArgs2() throws Exception
+ {
+ try
+ {
+ runTool(0, "org.apache.cassandra.tools.BulkLoader", "-d",
"127.9.9.1:9042", "--port", "9041", sstableDirName("legacy_sstables",
"legacy_ma_simple"));
+ fail();
+ }
+ catch (RuntimeException e)
+ {
+ if (!(e.getCause() instanceof BulkLoadException))
+ throw e;
+ if (!(e.getCause().getCause() instanceof NoHostAvailableException))
+ throw e;
+ }
+ assertNoUnexpectedThreadsStarted(null, new
String[]{"globalEventExecutor-1-1", "globalEventExecutor-1-2"});
+ assertSchemaNotLoaded();
+ assertCLSMNotLoaded();
+ assertSystemKSNotLoaded();
+ assertKeyspaceNotLoaded();
+ assertServerNotLoaded();
+ }
+
+ @Test(expected = NoHostAvailableException.class)
+ public void testBulkLoader_WithArgs3() throws Throwable
+ {
+ try
+ {
+ runTool(1, "org.apache.cassandra.tools.BulkLoader", "-d",
"127.9.9.1", "--port", "9041", sstableDirName("legacy_sstables",
"legacy_ma_simple"));
+ }
+ catch (RuntimeException e)
+ {
+ throw e.getCause().getCause();
+ }
+ }
+
+ @Test(expected = NoHostAvailableException.class)
+ public void testBulkLoader_WithArgs4() throws Throwable
+ {
+ try
+ {
+ runTool(1, "org.apache.cassandra.tools.BulkLoader", "-d",
"127.9.9.1:9041", sstableDirName("legacy_sstables", "legacy_ma_simple"));
+ }
+ catch (RuntimeException e)
+ {
+ throw e.getCause().getCause();
+ }
+ }
}
diff --git
a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java
b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java
index 24c10bf..8a484c7 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java
@@ -39,7 +39,6 @@ public class SettingsNode implements Serializable
public final List<String> nodes;
public final boolean isWhiteList;
public final String datacenter;
- public final boolean allowServerPortDiscovery;
public SettingsNode(Options options)
{
@@ -72,7 +71,6 @@ public class SettingsNode implements Serializable
isWhiteList = options.whitelist.setByUser();
datacenter = options.datacenter.value();
- allowServerPortDiscovery =
options.allowServerPortDiscovery.setByUser();
}
public Set<String> resolveAllPermitted(StressSettings settings)
@@ -145,13 +143,12 @@ public class SettingsNode implements Serializable
final OptionSimple datacenter = new OptionSimple("datacenter=", ".*",
null, "Datacenter used for DCAwareRoundRobinLoadPolicy", false);
final OptionSimple whitelist = new OptionSimple("whitelist", "", null,
"Limit communications to the provided nodes", false);
final OptionSimple file = new OptionSimple("file=", ".*", null, "Node
file (one per line)", false);
- final OptionSimple allowServerPortDiscovery = new
OptionSimple("allow_server_port_discovery", "", null, "Allow Java client to
discover server client port numbers", false);
final OptionSimple list = new OptionSimple("", "[^=,]+(,[^=,]+)*",
"localhost", "comma delimited list of nodes", false);
@Override
public List<? extends Option> options()
{
- return Arrays.asList(datacenter, whitelist, file,
allowServerPortDiscovery, list);
+ return Arrays.asList(datacenter, whitelist, file, list);
}
}
@@ -161,7 +158,6 @@ public class SettingsNode implements Serializable
out.println(" Nodes: " + nodes);
out.println(" Is White List: " + isWhiteList);
out.println(" Datacenter: " + datacenter);
- out.println(" Allow server port discovery: " +
allowServerPortDiscovery);
}
public static SettingsNode get(Map<String, String[]> clArgs)
diff --git
a/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
b/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
index 36361f7..72487dc 100644
--- a/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
+++ b/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
@@ -54,7 +54,6 @@ public class JavaDriverClient
private Cluster cluster;
private Session session;
private final LoadBalancingPolicy loadBalancingPolicy;
- private final boolean allowServerPortDiscovery;
private static final ConcurrentMap<String, PreparedStatement> stmts = new
ConcurrentHashMap<>();
@@ -74,7 +73,6 @@ public class JavaDriverClient
this.encryptionOptions = encryptionOptions;
this.loadBalancingPolicy = loadBalancingPolicy(settings);
this.connectionsPerHost = settings.mode.connectionsPerHost == null ? 8
: settings.mode.connectionsPerHost;
- this.allowServerPortDiscovery = settings.node.allowServerPortDiscovery;
int maxThreadCount = 0;
if (settings.rate.auto)
@@ -136,8 +134,6 @@ public class JavaDriverClient
.withoutJMXReporting()
.withProtocolVersion(protocolVersion)
.withoutMetrics(); // The
driver uses metrics 3 with conflict with our version
- if (allowServerPortDiscovery)
- clusterBuilder = clusterBuilder.allowServerPortDiscovery();
if (loadBalancingPolicy != null)
clusterBuilder.withLoadBalancingPolicy(loadBalancingPolicy);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]