Merge branch 'cassandra-3.0' into cassandra-3.X

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/94a01f62
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/94a01f62
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/94a01f62

Branch: refs/heads/cassandra-3.X
Commit: 94a01f62a6db2ffed561fbfac05c6caf5e2ba53c
Parents: 7c759e2 ee85507
Author: Stefania Alborghetti <stefania.alborghe...@datastax.com>
Authored: Tue Oct 25 09:43:17 2016 +0800
Committer: Stefania Alborghetti <stefania.alborghe...@datastax.com>
Committed: Tue Oct 25 09:43:57 2016 +0800

----------------------------------------------------------------------
 CHANGES.txt                                      |  1 +
 .../apache/cassandra/tracing/TraceStateImpl.java |  7 +++++--
 .../org/apache/cassandra/cql3/TraceCqlTest.java  | 19 +++++++++++++++++++
 3 files changed, 25 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/94a01f62/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/94a01f62/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tracing/TraceStateImpl.java
index fe78e64,0000000..349000a
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
+++ b/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
@@@ -1,127 -1,0 +1,130 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.tracing;
 +
 +import java.net.InetAddress;
 +import java.util.Collections;
 +import java.util.Set;
 +import java.util.UUID;
 +import java.util.concurrent.CompletableFuture;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.TimeoutException;
 +
++import com.google.common.annotations.VisibleForTesting;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.concurrent.Stage;
 +import org.apache.cassandra.concurrent.StageManager;
 +import org.apache.cassandra.db.ConsistencyLevel;
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.exceptions.OverloadedException;
 +import org.apache.cassandra.service.StorageProxy;
 +import org.apache.cassandra.utils.JVMStabilityInspector;
 +import org.apache.cassandra.utils.WrappedRunnable;
 +
 +/**
 + * ThreadLocal state for a tracing session. The presence of an instance of 
this class as a ThreadLocal denotes that an
 + * operation is being traced.
 + */
 +public class TraceStateImpl extends TraceState
 +{
 +    private static final Logger logger = 
LoggerFactory.getLogger(TraceStateImpl.class);
-     private static final int WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS =
-       
Integer.parseInt(System.getProperty("cassandra.wait_for_tracing_events_timeout_secs",
 "1"));
++
++    @VisibleForTesting
++    public static int WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS =
++      
Integer.parseInt(System.getProperty("cassandra.wait_for_tracing_events_timeout_secs",
 "0"));
 +
 +    private final Set<Future<?>> pendingFutures = 
ConcurrentHashMap.newKeySet();
 +
 +    public TraceStateImpl(InetAddress coordinator, UUID sessionId, 
Tracing.TraceType traceType)
 +    {
 +        super(coordinator, sessionId, traceType);
 +    }
 +
 +    protected void traceImpl(String message)
 +    {
 +        final String threadName = Thread.currentThread().getName();
 +        final int elapsed = elapsed();
 +
 +        executeMutation(TraceKeyspace.makeEventMutation(sessionIdBytes, 
message, elapsed, threadName, ttl));
 +        if (logger.isTraceEnabled())
 +            logger.trace("Adding <{}> to trace events", message);
 +    }
 +
 +    /**
 +     * Wait on submitted futures
 +     */
 +    protected void waitForPendingEvents()
 +    {
 +        if (WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS <= 0)
 +            return;
 +
 +        try
 +        {
 +            if (logger.isTraceEnabled())
 +                logger.trace("Waiting for up to {} seconds for {} trace 
events to complete",
 +                             +WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS, 
pendingFutures.size());
 +
 +            CompletableFuture.allOf(pendingFutures.toArray(new 
CompletableFuture<?>[pendingFutures.size()]))
 +                             .get(WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS, 
TimeUnit.SECONDS);
 +        }
 +        catch (TimeoutException ex)
 +        {
 +            if (logger.isTraceEnabled())
 +                logger.trace("Failed to wait for tracing events to complete 
in {} seconds",
 +                             WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS);
 +        }
 +        catch (Throwable t)
 +        {
 +            JVMStabilityInspector.inspectThrowable(t);
 +            logger.error("Got exception whilst waiting for tracing events to 
complete", t);
 +        }
 +    }
 +
 +
 +    void executeMutation(final Mutation mutation)
 +    {
 +        CompletableFuture<Void> fut = CompletableFuture.runAsync(new 
WrappedRunnable()
 +        {
 +            protected void runMayThrow()
 +            {
 +                mutateWithCatch(mutation);
 +            }
 +        }, StageManager.getStage(Stage.TRACING));
 +
 +        boolean ret = pendingFutures.add(fut);
 +        if (!ret)
 +            logger.warn("Failed to insert pending future, tracing 
synchronization may not work");
 +    }
 +
 +    static void mutateWithCatch(Mutation mutation)
 +    {
 +        try
 +        {
 +            StorageProxy.mutate(Collections.singletonList(mutation), 
ConsistencyLevel.ANY, System.nanoTime());
 +        }
 +        catch (OverloadedException e)
 +        {
 +            Tracing.logger.warn("Too many nodes are overloaded to save trace 
events");
 +        }
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/94a01f62/test/unit/org/apache/cassandra/cql3/TraceCqlTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/TraceCqlTest.java
index 735fb6a,0000000..f1af922
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/cql3/TraceCqlTest.java
+++ b/test/unit/org/apache/cassandra/cql3/TraceCqlTest.java
@@@ -1,126 -1,0 +1,145 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.cql3;
 +
++import org.junit.AfterClass;
++import org.junit.BeforeClass;
 +import org.junit.Test;
 +
 +import com.datastax.driver.core.CodecRegistry;
 +import com.datastax.driver.core.DataType;
 +import com.datastax.driver.core.PreparedStatement;
 +import com.datastax.driver.core.ProtocolVersion;
 +import com.datastax.driver.core.QueryTrace;
 +import com.datastax.driver.core.Session;
 +import com.datastax.driver.core.TupleType;
 +import com.datastax.driver.core.TupleValue;
++import org.apache.cassandra.tracing.TraceStateImpl;
 +
 +import static org.junit.Assert.assertEquals;
 +
 +public class TraceCqlTest extends CQLTester
 +{
++    static int DEFAULT_WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS;
++
++    @BeforeClass
++    public static void setUp()
++    {
++        // make sure we wait for trace events to complete, see CASSANDRA-12754
++        DEFAULT_WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS = 
TraceStateImpl.WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS;
++        TraceStateImpl.WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS = 5;
++    }
++
++    @AfterClass
++    public static void tearDown()
++    {
++        TraceStateImpl.WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS = 
DEFAULT_WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS;
++    }
++
 +    @Test
 +    public void testCqlStatementTracing() throws Throwable
 +    {
 +        requireNetwork();
 +
 +        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
 +        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", 
"Cassandra");
 +        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", 
"test");
 +
 +        try (Session session = sessionNet())
 +        {
 +            String cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + 
currentTable() + " WHERE id = ?";
 +            PreparedStatement pstmt = session.prepare(cql)
 +                                             .enableTracing();
 +            QueryTrace trace = 
session.execute(pstmt.bind(1)).getExecutionInfo().getQueryTrace();
 +            assertEquals(cql, trace.getParameters().get("query"));
 +
 +            assertEquals("1", trace.getParameters().get("bound_var_0_id"));
 +
 +            String cql2 = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + 
currentTable() + " WHERE id IN (?, ?, ?)";
 +            pstmt = session.prepare(cql2).enableTracing();
 +            trace = session.execute(pstmt.bind(19, 15, 
16)).getExecutionInfo().getQueryTrace();
 +            assertEquals(cql2, trace.getParameters().get("query"));
 +            assertEquals("19", trace.getParameters().get("bound_var_0_id"));
 +            assertEquals("15", trace.getParameters().get("bound_var_1_id"));
 +            assertEquals("16", trace.getParameters().get("bound_var_2_id"));
 +
 +            //some more complex tests for tables with map and tuple data 
types and long bound values
 +            createTable("CREATE TABLE %s (id int primary key, v1 text, v2 
tuple<int, text, float>, v3 map<int, text>)");
 +            execute("INSERT INTO %s (id, v1, v2, v3) values (?, ?, ?, ?)", 
12, "mahdix", tuple(3, "bar", 2.1f),
 +                    map(1290, "birthday", 39, "anniversary"));
 +            execute("INSERT INTO %s (id, v1, v2, v3) values (?, ?, ?, ?)", 
274, "CassandraRocks", tuple(9, "foo", 3.14f),
 +                    map(9181, "statement", 716, "public speech"));
 +
 +            cql = "SELECT id, v1, v2, v3 FROM " + KEYSPACE + '.' + 
currentTable() + " WHERE v2 = ? ALLOW FILTERING";
 +            pstmt = session.prepare(cql)
 +                           .enableTracing();
 +            TupleType tt = TupleType.of(ProtocolVersion.NEWEST_SUPPORTED, 
CodecRegistry.DEFAULT_INSTANCE, DataType.cint(),
 +                                        DataType.text(), DataType.cfloat());
 +            TupleValue value = tt.newValue();
 +            value.setInt(0, 3);
 +            value.setString(1, "bar");
 +            value.setFloat(2, 2.1f);
 +
 +            trace = 
session.execute(pstmt.bind(value)).getExecutionInfo().getQueryTrace();
 +            assertEquals(cql, trace.getParameters().get("query"));
 +            assertEquals("(3, 'bar', 2.1)", 
trace.getParameters().get("bound_var_0_v2"));
 +
 +            cql2 = "SELECT id, v1, v2, v3 FROM " + KEYSPACE + '.' + 
currentTable() + " WHERE v3 CONTAINS KEY ? ALLOW FILTERING";
 +            pstmt = session.prepare(cql2).enableTracing();
 +            trace = 
session.execute(pstmt.bind(9181)).getExecutionInfo().getQueryTrace();
 +
 +            assertEquals(cql2, trace.getParameters().get("query"));
 +            assertEquals("9181", 
trace.getParameters().get("bound_var_0_key(v3)"));
 +
 +            String boundValue = "Indulgence announcing uncommonly met she 
continuing two unpleasing terminated. Now " +
 +                                "busy say down the shed eyes roof paid her. 
Of shameless collected suspicion existence " +
 +                                "in. Share walls stuff think but the arise 
guest. Course suffer to do he sussex it " +
 +                                "window advice. Yet matter enable misery end 
extent common men should. Her indulgence " +
 +                                "but assistance favourable cultivated 
everything collecting." +
 +                                "On projection apartments unsatiable so if he 
entreaties appearance. Rose you wife " +
 +                                "how set lady half wish. Hard sing an in true 
felt. Welcomed stronger if steepest " +
 +                                "ecstatic an suitable finished of oh. Entered 
at excited at forming between so " +
 +                                "produce. Chicken unknown besides attacks gay 
compact out you. Continuing no " +
 +                                "simplicity no favourable on reasonably 
melancholy estimating. Own hence views two " +
 +                                "ask right whole ten seems. What near kept 
met call old west dine. Our announcing " +
 +                                "sufficient why pianoforte. Full age foo set 
feel her told. Tastes giving in passed" +
 +                                "direct me valley as supply. End great stood 
boy noisy often way taken short. Rent the " +
 +                                "size our more door. Years no place abode in 
\uFEFFno child my. Man pianoforte too " +
 +                                "solicitude friendship devonshire ten ask. 
Course sooner its silent but formal she " +
 +                                "led. Extensive he assurance extremity at 
breakfast. Dear sure ye sold fine sell on. " +
 +                                "Projection at up connection literature 
insensible motionless projecting." +
 +                                "Nor hence hoped her after other known defer 
his. For county now sister engage had " +
 +                                "season better had waited. Occasional mrs 
interested far expression acceptance. Day " +
 +                                "either mrs talent pulled men rather regret 
admire but. Life ye sake it shed. Five " +
 +                                "lady he cold in meet up. Service get met 
adapted matters offence for. Principles man " +
 +                                "any insipidity age you simplicity 
understood. Do offering pleasure no ecstatic " +
 +                                "whatever on mr directly. ";
 +
 +            String cql3 = "SELECT id, v1, v2, v3 FROM " + KEYSPACE + '.' + 
currentTable() + " WHERE v3 CONTAINS ? ALLOW FILTERING";
 +            pstmt = session.prepare(cql3).enableTracing();
 +            trace = 
session.execute(pstmt.bind(boundValue)).getExecutionInfo().getQueryTrace();
 +
 +            assertEquals(cql3, trace.getParameters().get("query"));
 +
 +            //when tracing is done, this boundValue will be surrounded by 
single quote, and first 1000 characters
 +            //will be filtered. Here we take into account single quotes by 
adding them to the expected output
 +            assertEquals("'" + boundValue.substring(0, 999) + "...'", 
trace.getParameters().get("bound_var_0_value(v3)"));
 +        }
 +    }
 +}

Reply via email to