Merge branch 'cassandra-3.0' into cassandra-3.X
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/94a01f62 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/94a01f62 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/94a01f62 Branch: refs/heads/cassandra-3.X Commit: 94a01f62a6db2ffed561fbfac05c6caf5e2ba53c Parents: 7c759e2 ee85507 Author: Stefania Alborghetti <stefania.alborghe...@datastax.com> Authored: Tue Oct 25 09:43:17 2016 +0800 Committer: Stefania Alborghetti <stefania.alborghe...@datastax.com> Committed: Tue Oct 25 09:43:57 2016 +0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/tracing/TraceStateImpl.java | 7 +++++-- .../org/apache/cassandra/cql3/TraceCqlTest.java | 19 +++++++++++++++++++ 3 files changed, 25 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/94a01f62/CHANGES.txt ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/94a01f62/src/java/org/apache/cassandra/tracing/TraceStateImpl.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/tracing/TraceStateImpl.java index fe78e64,0000000..349000a mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/tracing/TraceStateImpl.java +++ b/src/java/org/apache/cassandra/tracing/TraceStateImpl.java @@@ -1,127 -1,0 +1,130 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.tracing; + +import java.net.InetAddress; +import java.util.Collections; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + ++import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.exceptions.OverloadedException; +import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.WrappedRunnable; + +/** + * ThreadLocal state for a tracing session. The presence of an instance of this class as a ThreadLocal denotes that an + * operation is being traced. + */ +public class TraceStateImpl extends TraceState +{ + private static final Logger logger = LoggerFactory.getLogger(TraceStateImpl.class); - private static final int WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS = - Integer.parseInt(System.getProperty("cassandra.wait_for_tracing_events_timeout_secs", "1")); ++ ++ @VisibleForTesting ++ public static int WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS = ++ Integer.parseInt(System.getProperty("cassandra.wait_for_tracing_events_timeout_secs", "0")); + + private final Set<Future<?>> pendingFutures = ConcurrentHashMap.newKeySet(); + + public TraceStateImpl(InetAddress coordinator, UUID sessionId, Tracing.TraceType traceType) + { + super(coordinator, sessionId, traceType); + } + + protected void traceImpl(String message) + { + final String threadName = Thread.currentThread().getName(); + final int elapsed = elapsed(); + + executeMutation(TraceKeyspace.makeEventMutation(sessionIdBytes, message, elapsed, threadName, ttl)); + if (logger.isTraceEnabled()) + logger.trace("Adding <{}> to trace events", message); + } + + /** + * Wait on submitted futures + */ + protected void waitForPendingEvents() + { + if (WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS <= 0) + return; + + try + { + if (logger.isTraceEnabled()) + logger.trace("Waiting for up to {} seconds for {} trace events to complete", + +WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS, pendingFutures.size()); + + CompletableFuture.allOf(pendingFutures.toArray(new CompletableFuture<?>[pendingFutures.size()])) + .get(WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS, TimeUnit.SECONDS); + } + catch (TimeoutException ex) + { + if (logger.isTraceEnabled()) + logger.trace("Failed to wait for tracing events to complete in {} seconds", + WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS); + } + catch (Throwable t) + { + JVMStabilityInspector.inspectThrowable(t); + logger.error("Got exception whilst waiting for tracing events to complete", t); + } + } + + + void executeMutation(final Mutation mutation) + { + CompletableFuture<Void> fut = CompletableFuture.runAsync(new WrappedRunnable() + { + protected void runMayThrow() + { + mutateWithCatch(mutation); + } + }, StageManager.getStage(Stage.TRACING)); + + boolean ret = pendingFutures.add(fut); + if (!ret) + logger.warn("Failed to insert pending future, tracing synchronization may not work"); + } + + static void mutateWithCatch(Mutation mutation) + { + try + { + StorageProxy.mutate(Collections.singletonList(mutation), ConsistencyLevel.ANY, System.nanoTime()); + } + catch (OverloadedException e) + { + Tracing.logger.warn("Too many nodes are overloaded to save trace events"); + } + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/94a01f62/test/unit/org/apache/cassandra/cql3/TraceCqlTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/cql3/TraceCqlTest.java index 735fb6a,0000000..f1af922 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/cql3/TraceCqlTest.java +++ b/test/unit/org/apache/cassandra/cql3/TraceCqlTest.java @@@ -1,126 -1,0 +1,145 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.cql3; + ++import org.junit.AfterClass; ++import org.junit.BeforeClass; +import org.junit.Test; + +import com.datastax.driver.core.CodecRegistry; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ProtocolVersion; +import com.datastax.driver.core.QueryTrace; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.TupleType; +import com.datastax.driver.core.TupleValue; ++import org.apache.cassandra.tracing.TraceStateImpl; + +import static org.junit.Assert.assertEquals; + +public class TraceCqlTest extends CQLTester +{ ++ static int DEFAULT_WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS; ++ ++ @BeforeClass ++ public static void setUp() ++ { ++ // make sure we wait for trace events to complete, see CASSANDRA-12754 ++ DEFAULT_WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS = TraceStateImpl.WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS; ++ TraceStateImpl.WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS = 5; ++ } ++ ++ @AfterClass ++ public static void tearDown() ++ { ++ TraceStateImpl.WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS = DEFAULT_WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS; ++ } ++ + @Test + public void testCqlStatementTracing() throws Throwable + { + requireNetwork(); + + createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)"); + execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra"); + execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test"); + + try (Session session = sessionNet()) + { + String cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?"; + PreparedStatement pstmt = session.prepare(cql) + .enableTracing(); + QueryTrace trace = session.execute(pstmt.bind(1)).getExecutionInfo().getQueryTrace(); + assertEquals(cql, trace.getParameters().get("query")); + + assertEquals("1", trace.getParameters().get("bound_var_0_id")); + + String cql2 = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id IN (?, ?, ?)"; + pstmt = session.prepare(cql2).enableTracing(); + trace = session.execute(pstmt.bind(19, 15, 16)).getExecutionInfo().getQueryTrace(); + assertEquals(cql2, trace.getParameters().get("query")); + assertEquals("19", trace.getParameters().get("bound_var_0_id")); + assertEquals("15", trace.getParameters().get("bound_var_1_id")); + assertEquals("16", trace.getParameters().get("bound_var_2_id")); + + //some more complex tests for tables with map and tuple data types and long bound values + createTable("CREATE TABLE %s (id int primary key, v1 text, v2 tuple<int, text, float>, v3 map<int, text>)"); + execute("INSERT INTO %s (id, v1, v2, v3) values (?, ?, ?, ?)", 12, "mahdix", tuple(3, "bar", 2.1f), + map(1290, "birthday", 39, "anniversary")); + execute("INSERT INTO %s (id, v1, v2, v3) values (?, ?, ?, ?)", 274, "CassandraRocks", tuple(9, "foo", 3.14f), + map(9181, "statement", 716, "public speech")); + + cql = "SELECT id, v1, v2, v3 FROM " + KEYSPACE + '.' + currentTable() + " WHERE v2 = ? ALLOW FILTERING"; + pstmt = session.prepare(cql) + .enableTracing(); + TupleType tt = TupleType.of(ProtocolVersion.NEWEST_SUPPORTED, CodecRegistry.DEFAULT_INSTANCE, DataType.cint(), + DataType.text(), DataType.cfloat()); + TupleValue value = tt.newValue(); + value.setInt(0, 3); + value.setString(1, "bar"); + value.setFloat(2, 2.1f); + + trace = session.execute(pstmt.bind(value)).getExecutionInfo().getQueryTrace(); + assertEquals(cql, trace.getParameters().get("query")); + assertEquals("(3, 'bar', 2.1)", trace.getParameters().get("bound_var_0_v2")); + + cql2 = "SELECT id, v1, v2, v3 FROM " + KEYSPACE + '.' + currentTable() + " WHERE v3 CONTAINS KEY ? ALLOW FILTERING"; + pstmt = session.prepare(cql2).enableTracing(); + trace = session.execute(pstmt.bind(9181)).getExecutionInfo().getQueryTrace(); + + assertEquals(cql2, trace.getParameters().get("query")); + assertEquals("9181", trace.getParameters().get("bound_var_0_key(v3)")); + + String boundValue = "Indulgence announcing uncommonly met she continuing two unpleasing terminated. Now " + + "busy say down the shed eyes roof paid her. Of shameless collected suspicion existence " + + "in. Share walls stuff think but the arise guest. Course suffer to do he sussex it " + + "window advice. Yet matter enable misery end extent common men should. Her indulgence " + + "but assistance favourable cultivated everything collecting." + + "On projection apartments unsatiable so if he entreaties appearance. Rose you wife " + + "how set lady half wish. Hard sing an in true felt. Welcomed stronger if steepest " + + "ecstatic an suitable finished of oh. Entered at excited at forming between so " + + "produce. Chicken unknown besides attacks gay compact out you. Continuing no " + + "simplicity no favourable on reasonably melancholy estimating. Own hence views two " + + "ask right whole ten seems. What near kept met call old west dine. Our announcing " + + "sufficient why pianoforte. Full age foo set feel her told. Tastes giving in passed" + + "direct me valley as supply. End great stood boy noisy often way taken short. Rent the " + + "size our more door. Years no place abode in \uFEFFno child my. Man pianoforte too " + + "solicitude friendship devonshire ten ask. Course sooner its silent but formal she " + + "led. Extensive he assurance extremity at breakfast. Dear sure ye sold fine sell on. " + + "Projection at up connection literature insensible motionless projecting." + + "Nor hence hoped her after other known defer his. For county now sister engage had " + + "season better had waited. Occasional mrs interested far expression acceptance. Day " + + "either mrs talent pulled men rather regret admire but. Life ye sake it shed. Five " + + "lady he cold in meet up. Service get met adapted matters offence for. Principles man " + + "any insipidity age you simplicity understood. Do offering pleasure no ecstatic " + + "whatever on mr directly. "; + + String cql3 = "SELECT id, v1, v2, v3 FROM " + KEYSPACE + '.' + currentTable() + " WHERE v3 CONTAINS ? ALLOW FILTERING"; + pstmt = session.prepare(cql3).enableTracing(); + trace = session.execute(pstmt.bind(boundValue)).getExecutionInfo().getQueryTrace(); + + assertEquals(cql3, trace.getParameters().get("query")); + + //when tracing is done, this boundValue will be surrounded by single quote, and first 1000 characters + //will be filtered. Here we take into account single quotes by adding them to the expected output + assertEquals("'" + boundValue.substring(0, 999) + "...'", trace.getParameters().get("bound_var_0_value(v3)")); + } + } +}