Merge branch 'cassandra-2.0' into cassandra-2.1
Conflicts:
CHANGES.txt
src/java/org/apache/cassandra/db/BatchlogManager.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/440824c1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/440824c1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/440824c1
Branch: refs/heads/trunk
Commit: 440824c1a60a344bc3e8a5ad35ae2fac879bd61d
Parents: 014d328 e916dff
Author: Aleksey Yeschenko <[email protected]>
Authored: Fri Oct 17 03:40:17 2014 +0300
Committer: Aleksey Yeschenko <[email protected]>
Committed: Fri Oct 17 03:40:17 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/BatchlogManager.java | 64 ++++++++++----------
.../cassandra/service/StorageService.java | 25 ++++++--
.../cassandra/db/BatchlogManagerTest.java | 8 +--
4 files changed, 57 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/440824c1/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index b40e14b,73aaab0..d7a8904
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,89 -1,5 +1,90 @@@
-2.0.11:
+2.1.1
+ * Fix IllegalArgumentException when a list of IN values containing tuples
+ is passed as a single arg to a prepared statement with the v1 or v2
+ protocol (CASSANDRA-8062)
+ * Fix ClassCastException in DISTINCT query on static columns with
+ query paging (CASSANDRA-8108)
+ * Fix NPE on null nested UDT inside a set (CASSANDRA-8105)
+ * Fix exception when querying secondary index on set items or map keys
+ when some clustering columns are specified (CASSANDRA-8073)
+ * Send proper error response when there is an error during native
+ protocol message decode (CASSANDRA-8118)
+ * Gossip should ignore generation numbers too far in the future
(CASSANDRA-8113)
+ * Fix NPE when creating a table with frozen sets, lists (CASSANDRA-8104)
+ * Fix high memory use due to tracking reads on incrementally opened sstable
+ readers (CASSANDRA-8066)
+ * Fix EXECUTE request with skipMetadata=false returning no metadata
+ (CASSANDRA-8054)
+ * Allow concurrent use of CQLBulkOutputFormat (CASSANDRA-7776)
+ * Shutdown JVM on OOM (CASSANDRA-7507)
+ * Upgrade netty version and enable epoll event loop (CASSANDRA-7761)
+ * Don't duplicate sstables smaller than split size when using
+ the sstablesplitter tool (CASSANDRA-7616)
+ * Avoid re-parsing already prepared statements (CASSANDRA-7923)
+ * Fix some Thrift slice deletions and updates of COMPACT STORAGE
+ tables with some clustering columns omitted (CASSANDRA-7990)
+ * Fix filtering for CONTAINS on sets (CASSANDRA-8033)
+ * Properly track added size (CASSANDRA-7239)
+ * Allow compilation in java 8 (CASSANDRA-7208)
+ * Fix Assertion error on RangeTombstoneList diff (CASSANDRA-8013)
+ * Release references to overlapping sstables during compaction
(CASSANDRA-7819)
+ * Send notification when opening compaction results early (CASSANDRA-8034)
+ * Make native server start block until properly bound (CASSANDRA-7885)
+ * (cqlsh) Fix IPv6 support (CASSANDRA-7988)
+ * Ignore fat clients when checking for endpoint collision (CASSANDRA-7939)
+ * Make sstablerepairedset take a list of files (CASSANDRA-7995)
+ * (cqlsh) Tab completeion for indexes on map keys (CASSANDRA-7972)
+ * (cqlsh) Fix UDT field selection in select clause (CASSANDRA-7891)
+ * Fix resource leak in event of corrupt sstable
+ * (cqlsh) Add command line option for cqlshrc file path (CASSANDRA-7131)
+ * Provide visibility into prepared statements churn (CASSANDRA-7921,
CASSANDRA-7930)
+ * Invalidate prepared statements when their keyspace or table is
+ dropped (CASSANDRA-7566)
+ * cassandra-stress: fix support for NetworkTopologyStrategy (CASSANDRA-7945)
+ * Fix saving caches when a table is dropped (CASSANDRA-7784)
+ * Add better error checking of new stress profile (CASSANDRA-7716)
+ * Use ThreadLocalRandom and remove FBUtilities.threadLocalRandom
(CASSANDRA-7934)
+ * Prevent operator mistakes due to simultaneous bootstrap (CASSANDRA-7069)
+ * cassandra-stress supports whitelist mode for node config (CASSANDRA-7658)
+ * GCInspector more closely tracks GC; cassandra-stress and nodetool report
it (CASSANDRA-7916)
+ * nodetool won't output bogus ownership info without a keyspace
(CASSANDRA-7173)
+ * Add human readable option to nodetool commands (CASSANDRA-5433)
+ * Don't try to set repairedAt on old sstables (CASSANDRA-7913)
+ * Add metrics for tracking PreparedStatement use (CASSANDRA-7719)
+ * (cqlsh) tab-completion for triggers (CASSANDRA-7824)
+ * (cqlsh) Support for query paging (CASSANDRA-7514)
+ * (cqlsh) Show progress of COPY operations (CASSANDRA-7789)
+ * Add syntax to remove multiple elements from a map (CASSANDRA-6599)
+ * Support non-equals conditions in lightweight transactions (CASSANDRA-6839)
+ * Add IF [NOT] EXISTS to create/drop triggers (CASSANDRA-7606)
+ * (cqlsh) Display the current logged-in user (CASSANDRA-7785)
+ * (cqlsh) Don't ignore CTRL-C during COPY FROM execution (CASSANDRA-7815)
+ * (cqlsh) Order UDTs according to cross-type dependencies in DESCRIBE
+ output (CASSANDRA-7659)
+ * (cqlsh) Fix handling of CAS statement results (CASSANDRA-7671)
+ * (cqlsh) COPY TO/FROM improvements (CASSANDRA-7405)
+ * Support list index operations with conditions (CASSANDRA-7499)
+ * Add max live/tombstoned cells to nodetool cfstats output (CASSANDRA-7731)
+ * Validate IPv6 wildcard addresses properly (CASSANDRA-7680)
+ * (cqlsh) Error when tracing query (CASSANDRA-7613)
+ * Avoid IOOBE when building SyntaxError message snippet (CASSANDRA-7569)
+ * SSTableExport uses correct validator to create string representation of
partition
+ keys (CASSANDRA-7498)
+ * Avoid NPEs when receiving type changes for an unknown keyspace
(CASSANDRA-7689)
+ * Add support for custom 2i validation (CASSANDRA-7575)
+ * Pig support for hadoop CqlInputFormat (CASSANDRA-6454)
+ * Add listen_interface and rpc_interface options (CASSANDRA-7417)
+ * Improve schema merge performance (CASSANDRA-7444)
+ * Adjust MT depth based on # of partition validating (CASSANDRA-5263)
+ * Optimise NativeCell comparisons (CASSANDRA-6755)
+ * Configurable client timeout for cqlsh (CASSANDRA-7516)
+ * Include snippet of CQL query near syntax error in messages (CASSANDRA-7111)
+ * Make repair -pr work with -local (CASSANDRA-7450)
+ * Fix error in sstableloader with -cph > 1 (CASSANDRA-8007)
+ * Fix snapshot repair error on indexed tables (CASSANDRA-8020)
+ * Do not exit nodetool repair when receiving JMX NOTIF_LOST (CASSANDRA-7909)
+Merged from 2.0:
+ * Force batchlog replay before decommissioning a node (CASSANDRA-7446)
* Fix hint replay with many accumulated expired hints (CASSANDRA-6998)
* Fix duplicate results in DISTINCT queries on static columns with query
paging (CASSANDRA-8108)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/440824c1/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/BatchlogManager.java
index 7f8d355,48f4c3c..18d9a17
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@@ -38,9 -38,11 +37,8 @@@ import org.slf4j.LoggerFactory
import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
--import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.marshal.LongType;
-import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.WriteTimeoutException;
@@@ -117,12 -119,13 +120,13 @@@ public class BatchlogManager implement
replayAllFailedBatches();
}
};
- batchlogTasks.execute(runnable);
+ // If a replay is already in progress this request will be executed
after it completes.
+ return batchlogTasks.submit(runnable);
}
- public static RowMutation getBatchlogMutationFor(Collection<RowMutation>
mutations, UUID uuid)
+ public static Mutation getBatchlogMutationFor(Collection<Mutation>
mutations, UUID uuid, int version)
{
- return getBatchlogMutationFor(mutations, uuid,
FBUtilities.timestampMicros());
+ return getBatchlogMutationFor(mutations, uuid, version,
FBUtilities.timestampMicros());
}
@VisibleForTesting
@@@ -151,15 -158,11 +155,11 @@@
throw new AssertionError(); // cannot happen.
}
- return ByteBuffer.wrap(bos.toByteArray());
+ return buf.asByteBuffer();
}
- @VisibleForTesting
- void replayAllFailedBatches() throws ExecutionException,
InterruptedException
+ private void replayAllFailedBatches() throws ExecutionException,
InterruptedException
{
- if (!isReplaying.compareAndSet(false, true))
- return;
-
logger.debug("Started replayAllFailedBatches");
// rate limit is in bytes per second. Uses Double.MAX_VALUE if
disabled (set to 0 in cassandra.yaml).
@@@ -167,34 -170,27 +167,27 @@@
int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB()
/ StorageService.instance.getTokenMetadata().getAllEndpoints().size();
RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ?
Double.MAX_VALUE : throttleInKB * 1024);
- try
- {
- UntypedResultSet page = executeInternal(String.format("SELECT id,
data, written_at, version FROM %s.%s LIMIT %d",
-
Keyspace.SYSTEM_KS,
-
SystemKeyspace.BATCHLOG_CF,
- PAGE_SIZE));
-
- while (!page.isEmpty())
- {
- UUID id = processBatchlogPage(page, rateLimiter);
- UntypedResultSet page = process("SELECT id, data, written_at, version
FROM %s.%s LIMIT %d",
- Keyspace.SYSTEM_KS,
- SystemKeyspace.BATCHLOG_CF,
- PAGE_SIZE);
++ UntypedResultSet page = executeInternal(String.format("SELECT id,
data, written_at, version FROM %s.%s LIMIT %d",
++
Keyspace.SYSTEM_KS,
++
SystemKeyspace.BATCHLOG_CF,
++ PAGE_SIZE));
- if (page.size() < PAGE_SIZE)
- break; // we've exhausted the batchlog, next query would
be empty.
+ while (!page.isEmpty())
+ {
+ UUID id = processBatchlogPage(page, rateLimiter);
- page = executeInternal(String.format("SELECT id, data,
written_at, version FROM %s.%s WHERE token(id) > token(?) LIMIT %d",
- Keyspace.SYSTEM_KS,
-
SystemKeyspace.BATCHLOG_CF,
- PAGE_SIZE),
- id);
- }
+ if (page.size() < PAGE_SIZE)
+ break; // we've exhausted the batchlog, next query would be
empty.
- cleanup();
- }
- finally
- {
- isReplaying.set(false);
- page = process("SELECT id, data, written_at, version FROM %s.%s
WHERE token(id) > token(%s) LIMIT %d",
- Keyspace.SYSTEM_KS,
- SystemKeyspace.BATCHLOG_CF,
- id,
- PAGE_SIZE);
++ page = executeInternal(String.format("SELECT id, data,
written_at, version FROM %s.%s WHERE token(id) > token(?) LIMIT %d",
++ Keyspace.SYSTEM_KS,
++ SystemKeyspace.BATCHLOG_CF,
++ PAGE_SIZE),
++ id);
}
+ cleanup();
+
logger.debug("Finished replayAllFailedBatches");
}
@@@ -215,172 -202,89 +208,177 @@@
{
id = row.getUUID("id");
long writtenAt = row.getLong("written_at");
- int version = row.has("version") ? row.getInt("version") :
MessagingService.VERSION_12;
// enough time for the actual write + batchlog entry mutation
delivery (two separate requests).
- long timeout = DatabaseDescriptor.getWriteRpcTimeout() * 2; //
enough time for the actual write + BM removal mutation
+ long timeout = getBatchlogTimeout();
if (System.currentTimeMillis() < writtenAt + timeout)
continue; // not ready to replay yet, might still get a
deletion.
- replayBatch(id, row.getBytes("data"), writtenAt, version,
rateLimiter);
+
+ int version = row.has("version") ? row.getInt("version") :
MessagingService.VERSION_12;
+ Batch batch = new Batch(id, writtenAt, row.getBytes("data"),
version);
+ try
+ {
+ if (batch.replay(rateLimiter) > 0)
+ {
+ batches.add(batch);
+ }
+ else
+ {
+ deleteBatch(id); // no write mutations were sent (either
expired or all CFs involved truncated).
+ totalBatchesReplayed.incrementAndGet();
+ }
+ }
+ catch (IOException e)
+ {
+ logger.warn("Skipped batch replay of {} due to {}", id, e);
+ deleteBatch(id);
+ }
}
+
+ // now waiting for all batches to complete their processing
+ // schedule hints for timed out deliveries
+ for (Batch batch : batches)
+ {
+ batch.finish();
+ deleteBatch(batch.id);
+ }
+
+ totalBatchesReplayed.addAndGet(batches.size());
+
return id;
}
+ public long getBatchlogTimeout()
+ {
+ return DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time
for the actual write + BM removal mutation
+ }
+
- private void replayBatch(UUID id, ByteBuffer data, long writtenAt, int
version, RateLimiter rateLimiter)
+ private static class Batch
{
- logger.debug("Replaying batch {}", id);
+ private final UUID id;
+ private final long writtenAt;
+ private final ByteBuffer data;
+ private final int version;
- try
+ private List<ReplayWriteResponseHandler> replayHandlers;
+
+ public Batch(UUID id, long writtenAt, ByteBuffer data, int version)
{
- replaySerializedMutations(data, writtenAt, version, rateLimiter);
+ this.id = id;
+ this.writtenAt = writtenAt;
+ this.data = data;
+ this.version = version;
}
- catch (IOException e)
+
+ public int replay(RateLimiter rateLimiter) throws IOException
{
- logger.warn("Skipped batch replay of {} due to {}", id, e);
- }
+ logger.debug("Replaying batch {}", id);
- deleteBatch(id);
+ List<Mutation> mutations = replayingMutations();
- totalBatchesReplayed.incrementAndGet();
- }
+ if (mutations.isEmpty())
+ return 0;
- private void deleteBatch(UUID id)
- {
- RowMutation mutation = new RowMutation(Keyspace.SYSTEM_KS,
UUIDType.instance.decompose(id));
- mutation.delete(SystemKeyspace.BATCHLOG_CF,
FBUtilities.timestampMicros());
- mutation.apply();
- }
+ int ttl = calculateHintTTL(mutations);
+ if (ttl <= 0)
+ return 0;
- private void replaySerializedMutations(ByteBuffer data, long writtenAt,
int version, RateLimiter rateLimiter) throws IOException
- {
- DataInputStream in = new
DataInputStream(ByteBufferUtil.inputStream(data));
- int size = in.readInt();
- List<RowMutation> mutations = new ArrayList<>(size);
+ replayHandlers = sendReplays(mutations, writtenAt, ttl);
+
+ rateLimiter.acquire(data.remaining()); // acquire afterwards, to
not mess up ttl calculation.
+
+ return replayHandlers.size();
+ }
+
+ public void finish()
+ {
+ for (int i = 0; i < replayHandlers.size(); i++)
+ {
+ ReplayWriteResponseHandler handler = replayHandlers.get(i);
+ try
+ {
+ handler.get();
+ }
+ catch (WriteTimeoutException e)
+ {
+ logger.debug("Timed out replaying a batched mutation to a
node, will write a hint");
+ // writing hints for the rest to hints, starting from i
+ writeHintsForUndeliveredEndpoints(i);
+ return;
+ }
+ }
+ }
- for (int i = 0; i < size; i++)
+ private List<Mutation> replayingMutations() throws IOException
{
- RowMutation mutation = RowMutation.serializer.deserialize(in,
version);
+ DataInputStream in = new
DataInputStream(ByteBufferUtil.inputStream(data));
+ int size = in.readInt();
+ List<Mutation> mutations = new ArrayList<>(size);
+ for (int i = 0; i < size; i++)
+ {
+ Mutation mutation = Mutation.serializer.deserialize(in,
version);
- // Remove CFs that have been truncated since. writtenAt and
SystemTable#getTruncatedAt() both return millis.
- // We don't abort the replay entirely b/c this can be considered
a succes (truncated is same as delivered then
- // truncated.
- for (UUID cfId : mutation.getColumnFamilyIds())
- if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId))
- mutation = mutation.without(cfId);
+ // Remove CFs that have been truncated since. writtenAt and
SystemTable#getTruncatedAt() both return millis.
+ // We don't abort the replay entirely b/c this can be
considered a success (truncated is same as delivered then
+ // truncated.
+ for (UUID cfId : mutation.getColumnFamilyIds())
+ if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId))
+ mutation = mutation.without(cfId);
- if (!mutation.isEmpty())
- mutations.add(mutation);
+ if (!mutation.isEmpty())
+ mutations.add(mutation);
+ }
+ return mutations;
}
- if (!mutations.isEmpty())
- replayMutations(mutations, writtenAt, version, rateLimiter);
- }
+ private void writeHintsForUndeliveredEndpoints(int startFrom)
+ {
+ try
+ {
+ // Here we deserialize mutations 2nd time from byte buffer.
+ // but this is ok, because timeout on batch direct delivery
is rare
+ // (it can happen only several seconds until node is marked
dead)
+ // so trading some cpu to keep less objects
+ List<Mutation> replayingMutations = replayingMutations();
+ for (int i = startFrom; i < replayHandlers.size(); i++)
+ {
+ Mutation undeliveredMutation = replayingMutations.get(i);
+ int ttl = calculateHintTTL(replayingMutations);
+ ReplayWriteResponseHandler handler =
replayHandlers.get(i);
- /*
- * We try to deliver the mutations to the replicas ourselves if they are
alive and only resort to writing hints
- * when a replica is down or a write request times out.
- */
- private void replayMutations(List<RowMutation> mutations, long writtenAt,
int version, RateLimiter rateLimiter) throws IOException
- {
- int ttl = calculateHintTTL(mutations, writtenAt);
- if (ttl <= 0)
- return; // this batchlog entry has 'expired'
-
- List<InetAddress> liveEndpoints = new ArrayList<>();
- List<InetAddress> hintEndpoints = new ArrayList<>();
-
- for (RowMutation mutation : mutations)
+ if (ttl > 0 && handler != null)
+ for (InetAddress endpoint : handler.undelivered)
+
StorageProxy.writeHintForMutation(undeliveredMutation, writtenAt, ttl,
endpoint);
+ }
+ }
+ catch (IOException e)
+ {
+ logger.error("Cannot schedule hints for undelivered batch",
e);
+ }
+ }
+
+ private List<ReplayWriteResponseHandler> sendReplays(List<Mutation>
mutations, long writtenAt, int ttl)
{
+ List<ReplayWriteResponseHandler> handlers = new
ArrayList<>(mutations.size());
+ for (Mutation mutation : mutations)
+ {
+ ReplayWriteResponseHandler handler =
sendSingleReplayMutation(mutation, writtenAt, ttl);
+ if (handler != null)
+ handlers.add(handler);
+ }
+ return handlers;
+ }
+
+ /**
+ * We try to deliver the mutations to the replicas ourselves if they
are alive and only resort to writing hints
+ * when a replica is down or a write request times out.
+ *
+ * @return direct delivery handler to wait on or null, if no live
nodes found
+ */
+ private ReplayWriteResponseHandler sendSingleReplayMutation(final
Mutation mutation, long writtenAt, int ttl)
+ {
+ Set<InetAddress> liveEndpoints = new HashSet<>();
String ks = mutation.getKeyspaceName();
- Token tk =
StorageService.getPartitioner().getToken(mutation.key());
- int mutationSize = (int)
RowMutation.serializer.serializedSize(mutation, version);
+ Token<?> tk =
StorageService.getPartitioner().getToken(mutation.key());
for (InetAddress endpoint :
Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/440824c1/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/440824c1/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
----------------------------------------------------------------------