Merge branch 'cassandra-1.1' into cassandra-1.2
Conflicts:
src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c67e501f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c67e501f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c67e501f
Branch: refs/heads/trunk
Commit: c67e501f1196ac6c591ec162ef721b0f58f4ad0a
Parents: f07804e ffc9bec
Author: Yuki Morishita <[email protected]>
Authored: Thu Mar 14 16:00:47 2013 -0500
Committer: Yuki Morishita <[email protected]>
Committed: Thu Mar 14 16:00:47 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/ColumnFamilyStore.java | 6 ------
.../org/apache/cassandra/db/MeteredFlusher.java | 2 +-
.../apache/cassandra/service/CassandraDaemon.java | 3 +++
4 files changed, 5 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c67e501f/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index a34b73e,89c8b10..8e4e15b
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -29,89 -6,9 +29,90 @@@ Merged from 1.1
* nodetool: ability to repair specific range (CASSANDRA-5280)
* Fix possible assertion triggered in SliceFromReadCommand (CASSANDRA-5284)
* cqlsh: Add inet type support on Windows (ipv4-only) (CASSANDRA-4801)
+ * Fix race when initializing ColumnFamilyStore (CASSANDRA-5350)
+1.2.2
+ * fix potential for multiple concurrent compactions of the same sstables
+ (CASSANDRA-5256)
+ * avoid no-op caching of byte[] on commitlog append (CASSANDRA-5199)
+ * fix symlinks under data dir not working (CASSANDRA-5185)
+ * fix bug in compact storage metadata handling (CASSANDRA-5189)
+ * Validate login for USE queries (CASSANDRA-5207)
+ * cli: remove default username and password (CASSANDRA-5208)
+ * configure populate_io_cache_on_flush per-CF (CASSANDRA-4694)
+ * allow configuration of internode socket buffer (CASSANDRA-3378)
+ * Make sstable directory picking blacklist-aware again (CASSANDRA-5193)
+ * Correctly expire gossip states for edge cases (CASSANDRA-5216)
+ * Improve handling of directory creation failures (CASSANDRA-5196)
+ * Expose secondary indicies to the rest of nodetool (CASSANDRA-4464)
+ * Binary protocol: avoid sending notification for 0.0.0.0 (CASSANDRA-5227)
+ * add UseCondCardMark XX jvm settings on jdk 1.7 (CASSANDRA-4366)
+ * CQL3 refactor to allow conversion function (CASSANDRA-5226)
+ * Fix drop of sstables in some circumstance (CASSANDRA-5232)
+ * Implement caching of authorization results (CASSANDRA-4295)
+ * Add support for LZ4 compression (CASSANDRA-5038)
+ * Fix missing columns in wide rows queries (CASSANDRA-5225)
+ * Simplify auth setup and make system_auth ks alterable (CASSANDRA-5112)
+ * Stop compactions from hanging during bootstrap (CASSANDRA-5244)
+ * fix compressed streaming sending extra chunk (CASSANDRA-5105)
+ * Add CQL3-based implementations of IAuthenticator and IAuthorizer
+ (CASSANDRA-4898)
+ * Fix timestamp-based tomstone removal logic (CASSANDRA-5248)
+ * cli: Add JMX authentication support (CASSANDRA-5080)
+ * Fix forceFlush behavior (CASSANDRA-5241)
+ * cqlsh: Add username autocompletion (CASSANDRA-5231)
+ * Fix CQL3 composite partition key error (CASSANDRA-5240)
+ * Allow IN clause on last clustering key (CASSANDRA-5230)
+
+
+1.2.1
+ * stream undelivered hints on decommission (CASSANDRA-5128)
+ * GossipingPropertyFileSnitch loads saved dc/rack info if needed
(CASSANDRA-5133)
+ * drain should flush system CFs too (CASSANDRA-4446)
+ * add inter_dc_tcp_nodelay setting (CASSANDRA-5148)
+ * re-allow wrapping ranges for start_token/end_token range pairing
(CASSANDRA-5106)
+ * fix validation compaction of empty rows (CASSADRA-5136)
+ * nodetool methods to enable/disable hint storage/delivery (CASSANDRA-4750)
+ * disallow bloom filter false positive chance of 0 (CASSANDRA-5013)
+ * add threadpool size adjustment methods to JMXEnabledThreadPoolExecutor and
+ CompactionManagerMBean (CASSANDRA-5044)
+ * fix hinting for dropped local writes (CASSANDRA-4753)
+ * off-heap cache doesn't need mutable column container (CASSANDRA-5057)
+ * apply disk_failure_policy to bad disks on initial directory creation
+ (CASSANDRA-4847)
+ * Optimize name-based queries to use ArrayBackedSortedColumns
(CASSANDRA-5043)
+ * Fall back to old manifest if most recent is unparseable (CASSANDRA-5041)
+ * pool [Compressed]RandomAccessReader objects on the partitioned read path
+ (CASSANDRA-4942)
+ * Add debug logging to list filenames processed by Directories.migrateFile
+ method (CASSANDRA-4939)
+ * Expose black-listed directories via JMX (CASSANDRA-4848)
+ * Log compaction merge counts (CASSANDRA-4894)
+ * Minimize byte array allocation by AbstractData{Input,Output}
(CASSANDRA-5090)
+ * Add SSL support for the binary protocol (CASSANDRA-5031)
+ * Allow non-schema system ks modification for shuffle to work
(CASSANDRA-5097)
+ * cqlsh: Add default limit to SELECT statements (CASSANDRA-4972)
+ * cqlsh: fix DESCRIBE for 1.1 cfs in CQL3 (CASSANDRA-5101)
+ * Correctly gossip with nodes >= 1.1.7 (CASSANDRA-5102)
+ * Ensure CL guarantees on digest mismatch (CASSANDRA-5113)
+ * Validate correctly selects on composite partition key (CASSANDRA-5122)
+ * Fix exception when adding collection (CASSANDRA-5117)
+ * Handle states for non-vnode clusters correctly (CASSANDRA-5127)
+ * Refuse unrecognized replication and compaction strategy options
(CASSANDRA-4795)
+ * Pick the correct value validator in sstable2json for cql3 tables
(CASSANDRA-5134)
+ * Validate login for describe_keyspace, describe_keyspaces and set_keyspace
+ (CASSANDRA-5144)
+ * Fix inserting empty maps (CASSANDRA-5141)
+ * Don't remove tokens from System table for node we know (CASSANDRA-5121)
+ * fix streaming progress report for compresed files (CASSANDRA-5130)
+ * Coverage analysis for low-CL queries (CASSANDRA-4858)
+ * Stop interpreting dates as valid timeUUID value (CASSANDRA-4936)
+ * Adds E notation for floating point numbers (CASSANDRA-4927)
+ * Detect (and warn) unintentional use of the cql2 thrift methods when cql3
was
+ intended (CASSANDRA-5172)
+
+
1.1.10
* fix saved key cache not loading at startup (CASSANDRA-5166)
* fix ConcurrentModificationException in getBootstrapSource (CASSANDRA-5170)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c67e501f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c67e501f/src/java/org/apache/cassandra/db/MeteredFlusher.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/MeteredFlusher.java
index 7984944,199d2ee..408727c
--- a/src/java/org/apache/cassandra/db/MeteredFlusher.java
+++ b/src/java/org/apache/cassandra/db/MeteredFlusher.java
@@@ -29,9 -33,9 +29,9 @@@ import org.slf4j.LoggerFactory
import org.apache.cassandra.config.DatabaseDescriptor;
- class MeteredFlusher implements Runnable
+ public class MeteredFlusher implements Runnable
{
- private static Logger logger =
LoggerFactory.getLogger(MeteredFlusher.class);
+ private static final Logger logger =
LoggerFactory.getLogger(MeteredFlusher.class);
public void run()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c67e501f/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/CassandraDaemon.java
index 8e617c4,6b048b5..e01abaa
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@@ -7,328 -9,27 +7,331 @@@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
+package org.apache.cassandra.service;
-
+import java.io.File;
import java.io.IOException;
+import java.net.InetAddress;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.Iterables;
+import org.apache.log4j.PropertyConfigurator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.io.FSError;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.thrift.ThriftServer;
+import org.apache.cassandra.utils.CLibrary;
+import org.apache.cassandra.utils.Mx4jTool;
/**
- * The <code>CassandraDaemon</code> interface captures the lifecycle of a
- * Cassandra daemon that runs on a single node.
- *
+ * The <code>CassandraDaemon</code> is an abstraction for a Cassandra daemon
+ * service, which defines not only a way to activate and deactivate it, but
also
+ * hooks into its lifecycle methods (see {@link #setup()}, {@link #start()},
+ * {@link #stop()} and {@link #setup()}).
*/
-public interface CassandraDaemon
+public class CassandraDaemon
{
+ static
+ {
+ initLog4j();
+ }
+
+ /**
+ * Initialize logging in such a way that it checks for config changes
every 10 seconds.
+ */
+ public static void initLog4j()
+ {
+ if
(System.getProperty("log4j.defaultInitOverride","false").equalsIgnoreCase("true"))
+ {
+ String config = System.getProperty("log4j.configuration",
"log4j-server.properties");
+ URL configLocation = null;
+ try
+ {
+ // try loading from a physical location first.
+ configLocation = new URL(config);
+ }
+ catch (MalformedURLException ex)
+ {
+ // then try loading from the classpath.
+ configLocation =
CassandraDaemon.class.getClassLoader().getResource(config);
+ }
+
+ if (configLocation == null)
+ throw new RuntimeException("Couldn't figure out log4j
configuration: "+config);
+
+ // Now convert URL to a filename
+ String configFileName = null;
+ try
+ {
+ // first try URL.getFile() which works for opaque URLs
(file:foo) and paths without spaces
+ configFileName = configLocation.getFile();
+ File configFile = new File(configFileName);
+ // then try alternative approach which works for all
hierarchical URLs with or without spaces
+ if (!configFile.exists())
+ configFileName = new
File(configLocation.toURI()).getCanonicalPath();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Couldn't convert log4j
configuration location to a valid file", e);
+ }
+
+ PropertyConfigurator.configureAndWatch(configFileName, 10000);
+
org.apache.log4j.Logger.getLogger(CassandraDaemon.class).info("Logging
initialized");
+ }
+ }
+
+ private static final Logger logger =
LoggerFactory.getLogger(CassandraDaemon.class);
+
+ private static final CassandraDaemon instance = new CassandraDaemon();
+
+ static final AtomicInteger exceptions = new AtomicInteger();
+
+ public Server thriftServer;
+ public Server nativeServer;
+
+ /**
+ * This is a hook for concrete daemons to initialize themselves suitably.
+ *
+ * Subclasses should override this to finish the job (listening on ports,
etc.)
+ *
+ * @throws IOException
+ */
+ protected void setup()
+ {
+ // log warnings for different kinds of sub-optimal JVMs. tldr use
64-bit Oracle >= 1.6u32
+ if (!System.getProperty("os.arch").contains("64"))
+ logger.info("32bit JVM detected. It is recommended to run
Cassandra on a 64bit JVM for better performance.");
+ String javaVersion = System.getProperty("java.version");
+ String javaVmName = System.getProperty("java.vm.name");
+ logger.info("JVM vendor/version: {}/{}", javaVmName, javaVersion);
+ if (javaVmName.contains("OpenJDK"))
+ {
+ // There is essentially no QA done on OpenJDK builds, and
+ // clusters running OpenJDK have seen many heap and load issues.
+ logger.warn("OpenJDK is not recommended. Please upgrade to the
newest Oracle Java release");
+ }
+ else if (!javaVmName.contains("HotSpot"))
+ {
+ logger.warn("Non-Oracle JVM detected. Some features, such as
immediate unmap of compacted SSTables, may not work as intended");
+ }
+ else
+ {
+ String[] java_version = javaVersion.split("_");
+ String java_major = java_version[0];
+ int java_minor = (java_version.length > 1) ?
Integer.parseInt(java_version[1]) : 0;
+ if (java_major.equals("1.6.0"))
+ {
+ // These need to be updated from time to time, but these are
currently valid (12.18.2012)
+ if (java_minor < 29)
+ // Seen to be a major contributing factor for heap and
load issues
+ logger.error("Your JVM is out of date. Please upgrade to
the newest Oracle Java 6.");
+ else if (java_minor < 32)
+ // Updates 32+ have been seen to work well enough in the
wild
+ logger.warn("Your JVM is out of date. Please upgrade to
the newest Oracle Java 6.");
+ }
+ }
+
+ logger.info("Heap size: {}/{}", Runtime.getRuntime().totalMemory(),
Runtime.getRuntime().maxMemory());
+ logger.info("Classpath: {}", System.getProperty("java.class.path"));
+ CLibrary.tryMlockall();
+
+ Thread.setDefaultUncaughtExceptionHandler(new
Thread.UncaughtExceptionHandler()
+ {
+ public void uncaughtException(Thread t, Throwable e)
+ {
+ exceptions.incrementAndGet();
+ logger.error("Exception in thread " + t, e);
+ for (Throwable e2 = e; e2 != null; e2 = e2.getCause())
+ {
+ // some code, like FileChannel.map, will wrap an
OutOfMemoryError in another exception
+ if (e2 instanceof OutOfMemoryError)
+ System.exit(100);
+
+ if (e2 instanceof FSError)
+ {
+ if (e2 != e) // make sure FSError gets logged exactly
once.
+ logger.error("Exception in thread " + t, e2);
+ FileUtils.handleFSError((FSError) e2);
+ }
+ }
+ }
+ });
+
+ // check all directories(data, commitlog, saved cache) for existence
and permission
+ Iterable<String> dirs =
Iterables.concat(Arrays.asList(DatabaseDescriptor.getAllDataFileLocations()),
+
Arrays.asList(DatabaseDescriptor.getCommitLogLocation(),
+
DatabaseDescriptor.getSavedCachesLocation()));
+ for (String dataDir : dirs)
+ {
+ logger.debug("Checking directory {}", dataDir);
+ File dir = new File(dataDir);
+ if (dir.exists())
+ assert dir.isDirectory() && dir.canRead() && dir.canWrite()
&& dir.canExecute()
+ : String.format("Directory %s is not accessible.",
dataDir);
+ }
+
+ // Migrate sstables from pre-#2749 to the correct location
+ if (Directories.sstablesNeedsMigration())
+ Directories.migrateSSTables();
+
+ if (CacheService.instance == null) // should never happen
+ throw new RuntimeException("Failed to initialize Cache Service.");
+
+ // check the system table to keep user from shooting self in foot by
changing partitioner, cluster name, etc.
+ // we do a one-off scrub of the system table first; we can't load the
list of the rest of the tables,
+ // until system table is opened.
+ for (CFMetaData cfm :
Schema.instance.getTableMetaData(Table.SYSTEM_KS).values())
+ ColumnFamilyStore.scrubDataDirectories(Table.SYSTEM_KS,
cfm.cfName);
+ try
+ {
+ SystemTable.checkHealth();
+ }
+ catch (ConfigurationException e)
+ {
+ logger.error("Fatal exception during initialization", e);
+ System.exit(100);
+ }
+
+ // load keyspace descriptions.
+ try
+ {
+ DatabaseDescriptor.loadSchemas();
+ }
+ catch (IOException e)
+ {
+ logger.error("Fatal exception during initialization", e);
+ System.exit(100);
+ }
+
+ // clean up debris in the rest of the tables
+ for (String table : Schema.instance.getTables())
+ {
+ for (CFMetaData cfm :
Schema.instance.getTableMetaData(table).values())
+ {
+ ColumnFamilyStore.scrubDataDirectories(table, cfm.cfName);
+ }
+ }
+
+ // initialize keyspaces
+ for (String table : Schema.instance.getTables())
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("opening keyspace " + table);
+ // disable auto compaction until commit log replay ends
+ for (ColumnFamilyStore cfs :
Table.open(table).getColumnFamilyStores())
+ {
+ for (ColumnFamilyStore store : cfs.concatWithIndexes())
+ {
+ store.disableAutoCompaction();
+ }
+ }
+ }
+
+ if (CacheService.instance.keyCache.size() > 0)
+ logger.info("completed pre-loading ({} keys) key cache.",
CacheService.instance.keyCache.size());
+
+ if (CacheService.instance.rowCache.size() > 0)
+ logger.info("completed pre-loading ({} keys) row cache.",
CacheService.instance.rowCache.size());
+
+ try
+ {
+ GCInspector.instance.start();
+ }
+ catch (Throwable t)
+ {
+ logger.warn("Unable to start GCInspector (currently only
supported on the Sun JVM)");
+ }
+
+ // replay the log if necessary
+ try
+ {
+ CommitLog.instance.recover();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ // enable auto compaction
+ for (Table table : Table.all())
+ {
+ for (ColumnFamilyStore cfs : table.getColumnFamilyStores())
+ {
+ for (final ColumnFamilyStore store : cfs.concatWithIndexes())
+ {
+ store.enableAutoCompaction();
+ }
+ }
+ }
+ // start compactions in five minutes (if no flushes have occurred by
then to do so)
+ Runnable runnable = new Runnable()
+ {
+ public void run()
+ {
+ for (Table table : Table.all())
+ {
+ for (ColumnFamilyStore cf : table.getColumnFamilyStores())
+ {
+ for (ColumnFamilyStore store : cf.concatWithIndexes())
+
CompactionManager.instance.submitBackground(store);
+ }
+ }
+ }
+ };
+ StorageService.optionalTasks.schedule(runnable, 5 * 60,
TimeUnit.SECONDS);
+
++ // MeteredFlusher can block if flush queue fills up, so don't put on
scheduledTasks
++ StorageService.optionalTasks.scheduleWithFixedDelay(new
MeteredFlusher(), 1000, 1000, TimeUnit.MILLISECONDS);
++
+ SystemTable.finishStartup();
+
+ // start server internals
+ StorageService.instance.registerDaemon(this);
+ try
+ {
+ StorageService.instance.initServer();
+ }
+ catch (ConfigurationException e)
+ {
+ logger.error("Fatal configuration error", e);
+ System.err.println(e.getMessage() + "\nFatal configuration error;
unable to start server. See log for stacktrace.");
+ System.exit(1);
+ }
+
+ Mx4jTool.maybeLoad();
+
+ // Thift
+ InetAddress rpcAddr = DatabaseDescriptor.getRpcAddress();
+ int rpcPort = DatabaseDescriptor.getRpcPort();
+ thriftServer = new ThriftServer(rpcAddr, rpcPort);
+
+ // Native transport
+ InetAddress nativeAddr =
DatabaseDescriptor.getNativeTransportAddress();
+ int nativePort = DatabaseDescriptor.getNativeTransportPort();
+ nativeServer = new org.apache.cassandra.transport.Server(nativeAddr,
nativePort);
+ }
+
/**
* Initialize the Cassandra Daemon based on the given <a
* href="http://commons.apache.org/daemon/jsvc.html">Commons