This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new d5e5c45 Align record header of FQL and audit binary log
d5e5c45 is described below
commit d5e5c459f5c4c54c853b5fcfb5c2b3bfeee0d59c
Author: Per Otterström <[email protected]>
AuthorDate: Wed Apr 24 14:41:57 2019 +0200
Align record header of FQL and audit binary log
Patch by Per Otterström; reviewed by Vinay Chella and marcuse for
CASSANDRA-15076
---
CHANGES.txt | 1 +
conf/cassandra.yaml | 5 -
.../org/apache/cassandra/audit/BinAuditLogger.java | 17 ++-
.../apache/cassandra/audit/FullQueryLogger.java | 25 ++--
.../org/apache/cassandra/tools/AuditLogViewer.java | 67 ++++++++--
.../org/apache/cassandra/utils/binlog/BinLog.java | 32 ++++-
.../apache/cassandra/audit/BinAuditLoggerTest.java | 6 +-
.../cassandra/audit/FullQueryLoggerTest.java | 19 ++-
.../apache/cassandra/tools/AuditLogViewerTest.java | 138 ++++++++++++++++++++-
.../apache/cassandra/utils/binlog/BinLogTest.java | 73 +++++++++--
.../apache/cassandra/fqltool/FQLQueryReader.java | 38 +++++-
.../apache/cassandra/fqltool/commands/Dump.java | 21 +++-
.../apache/cassandra/fqltool/FQLReplayTest.java | 85 +++++++++++++
13 files changed, 458 insertions(+), 69 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 43f8cbe..3d1991a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-alpha3
+ * Align record header of FQL and audit binary log (CASSANDRA-15076)
* Shuffle forwarding replica for messages to non-local DC (CASSANDRA-15318)
* Optimise native protocol ASCII string encoding (CASSANDRA-15410)
* Make sure all exceptions are propagated in DebuggableThreadPoolExecutor
(CASSANDRA-15332)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index f3e5c75..9a79f24 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1259,11 +1259,6 @@ back_pressure_strategy:
# each write which may be lower in order to facilitate availability.
# ideal_consistency_level: EACH_QUORUM
-# Path to write full query log data to when the full query log is enabled
-# The full query log will recrusively delete the contents of this path at
-# times. Don't place links in this directory to other parts of the filesystem.
-#full_query_log_dir: /tmp/cassandrafullquerylog
-
# Automatically upgrade sstables after upgrade - if there is no ordinary
compaction to do, the
# oldest non-upgraded sstable will get upgraded to the latest version
# automatic_sstable_upgrade: false
diff --git a/src/java/org/apache/cassandra/audit/BinAuditLogger.java
b/src/java/org/apache/cassandra/audit/BinAuditLogger.java
index 23b9977..83ed3de 100644
--- a/src/java/org/apache/cassandra/audit/BinAuditLogger.java
+++ b/src/java/org/apache/cassandra/audit/BinAuditLogger.java
@@ -30,8 +30,8 @@ import org.apache.cassandra.utils.concurrent.WeightedQueue;
public class BinAuditLogger extends BinLogAuditLogger implements IAuditLogger
{
- public static final String TYPE = "type";
- public static final String AUDITLOG_TYPE = "AuditLog";
+ public static final long CURRENT_VERSION = 0;
+ public static final String AUDITLOG_TYPE = "audit";
public static final String AUDITLOG_MESSAGE = "message";
public BinAuditLogger()
@@ -71,10 +71,19 @@ public class BinAuditLogger extends BinLogAuditLogger
implements IAuditLogger
this.message = message;
}
+ protected long version()
+ {
+ return CURRENT_VERSION;
+ }
+
+ protected String type()
+ {
+ return AUDITLOG_TYPE;
+ }
+
@Override
- public void writeMarshallable(WireOut wire)
+ public void writeMarshallablePayload(WireOut wire)
{
- wire.write(TYPE).text(AUDITLOG_TYPE);
wire.write(AUDITLOG_MESSAGE).text(message);
}
diff --git a/src/java/org/apache/cassandra/audit/FullQueryLogger.java
b/src/java/org/apache/cassandra/audit/FullQueryLogger.java
index 9c1f472..7982940 100644
--- a/src/java/org/apache/cassandra/audit/FullQueryLogger.java
+++ b/src/java/org/apache/cassandra/audit/FullQueryLogger.java
@@ -23,7 +23,6 @@ import java.util.List;
import javax.annotation.Nullable;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
@@ -45,10 +44,7 @@ import org.github.jamm.MemoryLayoutSpecification;
*/
public class FullQueryLogger extends BinLogAuditLogger implements IAuditLogger
{
- public static final long CURRENT_VERSION = 0; // encode a dummy version,
to prevent pain in decoding in the future
-
- public static final String VERSION = "version";
- public static final String TYPE = "type";
+ public static final long CURRENT_VERSION = 0;
public static final String PROTOCOL_VERSION = "protocol-version";
public static final String QUERY_OPTIONS = "query-options";
@@ -169,9 +165,9 @@ public class FullQueryLogger extends BinLogAuditLogger
implements IAuditLogger
}
@Override
- public void writeMarshallable(WireOut wire)
+ public void writeMarshallablePayload(WireOut wire)
{
- super.writeMarshallable(wire);
+ super.writeMarshallablePayload(wire);
wire.write(QUERY).text(query);
}
@@ -229,9 +225,9 @@ public class FullQueryLogger extends BinLogAuditLogger
implements IAuditLogger
}
@Override
- public void writeMarshallable(WireOut wire)
+ public void writeMarshallablePayload(WireOut wire)
{
- super.writeMarshallable(wire);
+ super.writeMarshallablePayload(wire);
wire.write(BATCH_TYPE).text(batchType.name());
ValueOut valueOut = wire.write(QUERIES);
valueOut.int32(queries.size());
@@ -305,11 +301,14 @@ public class FullQueryLogger extends BinLogAuditLogger
implements IAuditLogger
}
@Override
- public void writeMarshallable(WireOut wire)
+ protected long version()
{
- wire.write(VERSION).int16(CURRENT_VERSION);
- wire.write(TYPE).text(type());
+ return CURRENT_VERSION;
+ }
+ @Override
+ public void writeMarshallablePayload(WireOut wire)
+ {
wire.write(QUERY_START_TIME).int64(queryStartTime);
wire.write(PROTOCOL_VERSION).int32(protocolVersion);
wire.write(QUERY_OPTIONS).bytes(BytesStore.wrap(queryOptionsBuffer.nioBuffer()));
@@ -339,8 +338,6 @@ public class FullQueryLogger extends BinLogAuditLogger
implements IAuditLogger
? Ints.checkedCast(ObjectSizes.sizeOf(keyspace)) //
keyspace
: OBJECT_REFERENCE_SIZE); // null
}
-
- protected abstract String type();
}
}
diff --git a/src/java/org/apache/cassandra/tools/AuditLogViewer.java
b/src/java/org/apache/cassandra/tools/AuditLogViewer.java
index 01ea7b3..f1a6e37 100644
--- a/src/java/org/apache/cassandra/tools/AuditLogViewer.java
+++ b/src/java/org/apache/cassandra/tools/AuditLogViewer.java
@@ -40,6 +40,7 @@ import net.openhft.chronicle.threads.Pauser;
import net.openhft.chronicle.wire.ReadMarshallable;
import net.openhft.chronicle.wire.WireIn;
import org.apache.cassandra.audit.BinAuditLogger;
+import org.apache.cassandra.utils.binlog.BinLog;
/**
* Tool to view the contenst of AuditLog files in human readable format.
Default implementation for AuditLog files
@@ -51,6 +52,7 @@ public class AuditLogViewer
private static final String TOOL_NAME = "auditlogviewer";
private static final String ROLL_CYCLE = "roll_cycle";
private static final String FOLLOW = "follow";
+ private static final String IGNORE = "ignore";
private static final String HELP_OPTION = "help";
public static void main(String[] args)
@@ -59,7 +61,7 @@ public class AuditLogViewer
try
{
- dump(options.pathList, options.rollCycle, options.follow,
System.out::print);
+ dump(options.pathList, options.rollCycle, options.follow,
options.ignoreUnsupported, System.out::print);
}
catch (Exception e)
{
@@ -68,7 +70,7 @@ public class AuditLogViewer
}
}
- static void dump(List<String> pathList, String rollCycle, boolean follow,
Consumer<String> displayFun)
+ static void dump(List<String> pathList, String rollCycle, boolean follow,
boolean ignoreUnsupported, Consumer<String> displayFun)
{
//Backoff strategy for spinning on the queue, not aggressive at all as
this doesn't need to be low latency
Pauser pauser = Pauser.millis(100);
@@ -83,7 +85,7 @@ public class AuditLogViewer
hadWork = false;
for (ExcerptTailer tailer : tailers)
{
- while (tailer.readDocument(new DisplayRecord(displayFun)))
+ while (tailer.readDocument(new
DisplayRecord(ignoreUnsupported, displayFun)))
{
hadWork = true;
}
@@ -104,28 +106,69 @@ public class AuditLogViewer
private static class DisplayRecord implements ReadMarshallable
{
+ private final boolean ignoreUnsupported;
private final Consumer<String> displayFun;
- DisplayRecord(Consumer<String> displayFun)
+ DisplayRecord(boolean ignoreUnsupported, Consumer<String> displayFun)
{
+ this.ignoreUnsupported = ignoreUnsupported;
this.displayFun = displayFun;
}
public void readMarshallable(WireIn wireIn) throws IORuntimeException
{
- StringBuilder sb = new StringBuilder();
+ int version = wireIn.read(BinLog.VERSION).int16();
+ if (!isSupportedVersion(version))
+ {
+ return;
+ }
+ String type = wireIn.read(BinLog.TYPE).text();
+ if (!isSupportedType(type))
+ {
+ return;
+ }
- String type = wireIn.read(BinAuditLogger.TYPE).text();
+ StringBuilder sb = new StringBuilder();
sb.append("Type: ")
.append(type)
+ .append(System.lineSeparator())
+ .append("LogMessage: ")
+ .append(wireIn.read(BinAuditLogger.AUDITLOG_MESSAGE).text())
.append(System.lineSeparator());
- if (null != type && type.equals(BinAuditLogger.AUDITLOG_TYPE))
+ displayFun.accept(sb.toString());
+ }
+
+ private boolean isSupportedVersion(int version)
+ {
+ if (version <= BinAuditLogger.CURRENT_VERSION)
{
- sb.append("LogMessage:
").append(wireIn.read(BinAuditLogger.AUDITLOG_MESSAGE).text()).append(System.lineSeparator());
+ return true;
}
- displayFun.accept(sb.toString());
+ if (ignoreUnsupported)
+ {
+ return false;
+ }
+
+ throw new IORuntimeException("Unsupported record version [" +
version
+ + "] - highest supported version is
[" + BinAuditLogger.CURRENT_VERSION + ']');
+ }
+
+ private boolean isSupportedType(String type)
+ {
+ if (BinAuditLogger.AUDITLOG_TYPE.equals(type))
+ {
+ return true;
+ }
+
+ if (ignoreUnsupported)
+ {
+ return false;
+ }
+
+ throw new IORuntimeException("Unsupported record type field [" +
type
+ + "] - supported type is [" +
BinAuditLogger.AUDITLOG_TYPE + ']');
}
}
@@ -134,6 +177,7 @@ public class AuditLogViewer
private final List<String> pathList;
private String rollCycle = "HOURLY";
private boolean follow;
+ private boolean ignoreUnsupported;
private AuditLogViewerOptions(String[] pathList)
{
@@ -166,6 +210,8 @@ public class AuditLogViewer
opts.follow = cmd.hasOption(FOLLOW);
+ opts.ignoreUnsupported = cmd.hasOption(IGNORE);
+
if (cmd.hasOption(ROLL_CYCLE))
{
opts.rollCycle = cmd.getOptionValue(ROLL_CYCLE);
@@ -191,8 +237,9 @@ public class AuditLogViewer
{
Options options = new Options();
- options.addOption(new Option("r", ROLL_CYCLE, false, "How often to
roll the log file was rolled. May be necessary for Chronicle to correctly parse
file names. (MINUTELY, HOURLY, DAILY). Default HOURLY."));
+ options.addOption(new Option("r", ROLL_CYCLE, true, "How often to
roll the log file was rolled. May be necessary for Chronicle to correctly parse
file names. (MINUTELY, HOURLY, DAILY). Default HOURLY."));
options.addOption(new Option("f", FOLLOW, false, "Upon reacahing
the end of the log continue indefinitely waiting for more records"));
+ options.addOption(new Option("i", IGNORE, false, "Silently ignore
unsupported records"));
options.addOption(new Option("h", HELP_OPTION, false, "display
this help message"));
return options;
diff --git a/src/java/org/apache/cassandra/utils/binlog/BinLog.java
b/src/java/org/apache/cassandra/utils/binlog/BinLog.java
index d4dac78..0fce6ee 100644
--- a/src/java/org/apache/cassandra/utils/binlog/BinLog.java
+++ b/src/java/org/apache/cassandra/utils/binlog/BinLog.java
@@ -53,6 +53,9 @@ public class BinLog implements Runnable
{
private static final Logger logger = LoggerFactory.getLogger(BinLog.class);
+ public static final String VERSION = "version";
+ public static final String TYPE = "type";
+
private ChronicleQueue queue;
private ExcerptAppender appender;
@VisibleForTesting
@@ -63,7 +66,19 @@ public class BinLog implements Runnable
private static final ReleaseableWriteMarshallable NO_OP = new
ReleaseableWriteMarshallable()
{
@Override
- public void writeMarshallable(WireOut wire)
+ protected long version()
+ {
+ return 0;
+ }
+
+ @Override
+ protected String type()
+ {
+ return "no-op";
+ }
+
+ @Override
+ public void writeMarshallablePayload(WireOut wire)
{
}
@@ -229,6 +244,21 @@ public class BinLog implements Runnable
public abstract static class ReleaseableWriteMarshallable implements
WriteMarshallable
{
+ @Override
+ public final void writeMarshallable(WireOut wire)
+ {
+ wire.write(VERSION).int16(version());
+ wire.write(TYPE).text(type());
+
+ writeMarshallablePayload(wire);
+ }
+
+ protected abstract long version();
+
+ protected abstract String type();
+
+ protected abstract void writeMarshallablePayload(WireOut wire);
+
public abstract void release();
}
}
diff --git a/test/unit/org/apache/cassandra/audit/BinAuditLoggerTest.java
b/test/unit/org/apache/cassandra/audit/BinAuditLoggerTest.java
index f9d2930..90c9325 100644
--- a/test/unit/org/apache/cassandra/audit/BinAuditLoggerTest.java
+++ b/test/unit/org/apache/cassandra/audit/BinAuditLoggerTest.java
@@ -76,12 +76,14 @@ public class BinAuditLoggerTest extends CQLTester
{
ExcerptTailer tailer = queue.createTailer();
assertTrue(tailer.readDocument(wire -> {
- assertEquals("AuditLog", wire.read("type").text());
+ assertEquals(0L, wire.read("version").int16());
+ assertEquals("audit", wire.read("type").text());
assertThat(wire.read("message").text(),
containsString(AuditLogEntryType.PREPARE_STATEMENT.toString()));
}));
assertTrue(tailer.readDocument(wire -> {
- assertEquals("AuditLog", wire.read("type").text());
+ assertEquals(0L, wire.read("version").int16());
+ assertEquals("audit", wire.read("type").text());
assertThat(wire.read("message").text(),
containsString(AuditLogEntryType.SELECT.toString()));
}));
assertFalse(tailer.readDocument(wire -> {
diff --git a/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java
b/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java
index 525fa8e..62e5a50 100644
--- a/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java
+++ b/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java
@@ -55,11 +55,9 @@ import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.binlog.BinLogTest;
-import org.apache.cassandra.utils.binlog.DeletingArchiver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.apache.cassandra.audit.FullQueryLogger.BATCH;
@@ -72,9 +70,10 @@ import static
org.apache.cassandra.audit.FullQueryLogger.QUERY;
import static org.apache.cassandra.audit.FullQueryLogger.QUERY_OPTIONS;
import static org.apache.cassandra.audit.FullQueryLogger.QUERY_START_TIME;
import static org.apache.cassandra.audit.FullQueryLogger.SINGLE_QUERY;
-import static org.apache.cassandra.audit.FullQueryLogger.TYPE;
import static org.apache.cassandra.audit.FullQueryLogger.VALUES;
-import static org.apache.cassandra.audit.FullQueryLogger.VERSION;
+
+import static org.apache.cassandra.utils.binlog.BinLog.TYPE;
+import static org.apache.cassandra.utils.binlog.BinLog.VERSION;
public class FullQueryLoggerTest extends CQLTester
{
@@ -274,7 +273,7 @@ public class FullQueryLoggerTest extends CQLTester
instance.binLog.put(new Query("foo1", QueryOptions.DEFAULT,
queryState(), 1)
{
- public void writeMarshallable(WireOut wire)
+ public void writeMarshallablePayload(WireOut wire)
{
//Notify that the bin log is blocking now
binLogBlocked.release();
@@ -287,7 +286,7 @@ public class FullQueryLoggerTest extends CQLTester
{
e.printStackTrace();
}
- super.writeMarshallable(wire);
+ super.writeMarshallablePayload(wire);
}
public void release()
@@ -356,7 +355,7 @@ public class FullQueryLoggerTest extends CQLTester
instance.binLog.put(new Query("foo1", QueryOptions.DEFAULT,
queryState(), 1)
{
- public void writeMarshallable(WireOut wire)
+ public void writeMarshallablePayload(WireOut wire)
{
//Notify that the bin log is blocking now
binLogBlocked.release();
@@ -369,7 +368,7 @@ public class FullQueryLoggerTest extends CQLTester
{
e.printStackTrace();
}
- super.writeMarshallable(wire);
+ super.writeMarshallablePayload(wire);
}
public void release()
@@ -388,10 +387,10 @@ public class FullQueryLoggerTest extends CQLTester
AtomicInteger releasedCount = new AtomicInteger(0);
AtomicInteger writtenCount = new AtomicInteger(0);
instance.logRecord(new Query("foo3", QueryOptions.DEFAULT,
queryState(), 1) {
- public void writeMarshallable(WireOut wire)
+ public void writeMarshallablePayload(WireOut wire)
{
writtenCount.incrementAndGet();
- super.writeMarshallable(wire);
+ super.writeMarshallablePayload(wire);
}
public void release()
diff --git a/test/unit/org/apache/cassandra/tools/AuditLogViewerTest.java
b/test/unit/org/apache/cassandra/tools/AuditLogViewerTest.java
index 078f899..649712a 100644
--- a/test/unit/org/apache/cassandra/tools/AuditLogViewerTest.java
+++ b/test/unit/org/apache/cassandra/tools/AuditLogViewerTest.java
@@ -31,12 +31,16 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import net.openhft.chronicle.core.io.IORuntimeException;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ChronicleQueueBuilder;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.RollCycles;
+import net.openhft.chronicle.wire.WireOut;
import org.apache.cassandra.audit.BinAuditLogger;
+import static org.junit.Assert.assertTrue;
+
public class AuditLogViewerTest
{
private Path path;
@@ -73,12 +77,138 @@ public class AuditLogViewerTest
//Read those written records
List<String> actualRecords = new ArrayList<>();
- AuditLogViewer.dump(ImmutableList.of(path.toString()),
RollCycles.TEST_SECONDLY.toString(), false, actualRecords::add);
+ AuditLogViewer.dump(ImmutableList.of(path.toString()),
RollCycles.TEST_SECONDLY.toString(), false, false, actualRecords::add);
+
+ assertRecordsMatch(records, actualRecords);
+ }
+ }
+
+ @Test (expected = IORuntimeException.class)
+ public void testRejectFutureVersionRecord()
+ {
+ try (ChronicleQueue queue =
ChronicleQueueBuilder.single(path.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
+ {
+ ExcerptAppender appender = queue.acquireAppender();
+ appender.writeDocument(createFutureRecord());
+
+ AuditLogViewer.dump(ImmutableList.of(path.toString()),
RollCycles.TEST_SECONDLY.toString(), false, false, dummy -> {});
+ }
+ catch (Exception e)
+ {
+ assertTrue(e.getMessage().contains("Unsupported record version"));
+ throw e;
+ }
+ }
+
+ @Test
+ public void testIgnoreFutureVersionRecord()
+ {
+ List<String> records = new ArrayList<>();
+ records.add("Test foo bar 1");
+ records.add("Test foo bar 2");
+
+ try (ChronicleQueue queue =
ChronicleQueueBuilder.single(path.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
+ {
+ ExcerptAppender appender = queue.acquireAppender();
+
+ //Write future record
+ appender.writeDocument(createFutureRecord());
+
+ //Write bunch of current records
+ records.forEach(s -> appender.writeDocument(new
BinAuditLogger.Message(s)));
+
+ //Read those written records
+ List<String> actualRecords = new ArrayList<>();
+ AuditLogViewer.dump(ImmutableList.of(path.toString()),
RollCycles.TEST_SECONDLY.toString(), false, true, actualRecords::add);
+
+ // Assert all current records are present
+ assertRecordsMatch(records, actualRecords);
+ }
+ }
+
+ @Test (expected = IORuntimeException.class)
+ public void testRejectUnknownTypeRecord()
+ {
+ try (ChronicleQueue queue =
ChronicleQueueBuilder.single(path.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
+ {
+ ExcerptAppender appender = queue.acquireAppender();
+ appender.writeDocument(createUnknownTypeRecord());
+
+ AuditLogViewer.dump(ImmutableList.of(path.toString()),
RollCycles.TEST_SECONDLY.toString(), false, false, dummy -> {});
+ }
+ catch (Exception e)
+ {
+ assertTrue(e.getMessage().contains("Unsupported record type
field"));
+ throw e;
+ }
+ }
+
+ @Test
+ public void testIgnoreUnknownTypeRecord()
+ {
+ List<String> records = new ArrayList<>();
+ records.add("Test foo bar 1");
+ records.add("Test foo bar 2");
+
+ try (ChronicleQueue queue =
ChronicleQueueBuilder.single(path.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
+ {
+ ExcerptAppender appender = queue.acquireAppender();
+
+ //Write unrecognized type record
+ appender.writeDocument(createUnknownTypeRecord());
+
+ //Write bunch of supported records
+ records.forEach(s -> appender.writeDocument(new
BinAuditLogger.Message(s)));
+
+ //Read those written records
+ List<String> actualRecords = new ArrayList<>();
+ AuditLogViewer.dump(ImmutableList.of(path.toString()),
RollCycles.TEST_SECONDLY.toString(), false, true, actualRecords::add);
- for (int i = 0; i < records.size(); i++)
+ // Assert all supported records are present
+ assertRecordsMatch(records, actualRecords);
+ }
+ }
+
+ private BinAuditLogger.Message createFutureRecord()
+ {
+ return new BinAuditLogger.Message("dummy message") {
+ protected long version()
+ {
+ return 999;
+ }
+
+ @Override
+ public void writeMarshallablePayload(WireOut wire)
{
-
Assert.assertTrue(actualRecords.get(i).contains(records.get(i)));
+ super.writeMarshallablePayload(wire);
+ wire.write("future-field").text("future_value");
}
+ };
+ }
+
+ private BinAuditLogger.Message createUnknownTypeRecord()
+ {
+ return new BinAuditLogger.Message("dummy message") {
+ protected String type()
+ {
+ return "unknown-type";
+ }
+
+ @Override
+ public void writeMarshallablePayload(WireOut wire)
+ {
+ super.writeMarshallablePayload(wire);
+ wire.write("unknown-field").text("unknown_value");
+ }
+ };
+ }
+
+ private void assertRecordsMatch(List<String> records, List<String>
actualRecords)
+ {
+ Assert.assertEquals(records.size(), actualRecords.size());
+ for (int i = 0; i < records.size(); i++)
+ {
+ Assert.assertTrue(actualRecords.get(i).contains(records.get(i)));
}
}
-}
\ No newline at end of file
+}
diff --git a/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java
b/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java
index 76c42f2..81f1ea0 100644
--- a/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java
+++ b/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java
@@ -36,8 +36,6 @@ import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.wire.WireOut;
import org.apache.cassandra.Util;
-import org.apache.cassandra.audit.AuditLogOptions;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.util.FileUtils;
import static org.junit.Assert.assertEquals;
@@ -122,7 +120,17 @@ public class BinLogTest
releaseCount.incrementAndGet();
}
- public void writeMarshallable(WireOut wire)
+ protected long version()
+ {
+ return 0;
+ }
+
+ protected String type()
+ {
+ return "test";
+ }
+
+ public void writeMarshallablePayload(WireOut wire)
{
try
{
@@ -136,8 +144,17 @@ public class BinLogTest
});
binLog.put(new BinLog.ReleaseableWriteMarshallable()
{
+ protected long version()
+ {
+ return 0;
+ }
+
+ protected String type()
+ {
+ return "test";
+ }
- public void writeMarshallable(WireOut wire)
+ public void writeMarshallablePayload(WireOut wire)
{
}
@@ -182,7 +199,17 @@ public class BinLogTest
released.release();
}
- public void writeMarshallable(WireOut wire)
+ protected long version()
+ {
+ return 0;
+ }
+
+ protected String type()
+ {
+ return "test";
+ }
+
+ public void writeMarshallablePayload(WireOut wire)
{
}
@@ -227,7 +254,17 @@ public class BinLogTest
{
}
- public void writeMarshallable(WireOut wire)
+ protected long version()
+ {
+ return 0;
+ }
+
+ protected String type()
+ {
+ return "test";
+ }
+
+ public void writeMarshallablePayload(WireOut wire)
{
//Notify the bing log thread is about to block
binLogBlocked.release();
@@ -303,7 +340,17 @@ public class BinLogTest
{
}
- public void writeMarshallable(WireOut wire)
+ protected long version()
+ {
+ return 0;
+ }
+
+ protected String type()
+ {
+ return "test";
+ }
+
+ public void writeMarshallablePayload(WireOut wire)
{
//Notify the bing log thread is about to block
binLogBlocked.release();
@@ -420,7 +467,17 @@ public class BinLogTest
//Do nothing
}
- public void writeMarshallable(WireOut wire)
+ protected long version()
+ {
+ return 0;
+ }
+
+ protected String type()
+ {
+ return "test";
+ }
+
+ public void writeMarshallablePayload(WireOut wire)
{
wire.write("text").text(text);
}
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQueryReader.java
b/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQueryReader.java
index 0c18b79..164b077 100644
--- a/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQueryReader.java
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQueryReader.java
@@ -32,14 +32,13 @@ import net.openhft.chronicle.wire.WireIn;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.transport.ProtocolVersion;
+import static org.apache.cassandra.audit.FullQueryLogger.CURRENT_VERSION;
import static
org.apache.cassandra.audit.FullQueryLogger.GENERATED_NOW_IN_SECONDS;
import static org.apache.cassandra.audit.FullQueryLogger.GENERATED_TIMESTAMP;
import static org.apache.cassandra.audit.FullQueryLogger.KEYSPACE;
import static org.apache.cassandra.audit.FullQueryLogger.PROTOCOL_VERSION;
import static org.apache.cassandra.audit.FullQueryLogger.QUERY_OPTIONS;
import static org.apache.cassandra.audit.FullQueryLogger.QUERY_START_TIME;
-import static org.apache.cassandra.audit.FullQueryLogger.TYPE;
-import static org.apache.cassandra.audit.FullQueryLogger.VERSION;
import static org.apache.cassandra.audit.FullQueryLogger.BATCH;
import static org.apache.cassandra.audit.FullQueryLogger.BATCH_TYPE;
import static org.apache.cassandra.audit.FullQueryLogger.QUERIES;
@@ -47,14 +46,18 @@ import static
org.apache.cassandra.audit.FullQueryLogger.QUERY;
import static org.apache.cassandra.audit.FullQueryLogger.SINGLE_QUERY;
import static org.apache.cassandra.audit.FullQueryLogger.VALUES;
+import static org.apache.cassandra.utils.binlog.BinLog.TYPE;
+import static org.apache.cassandra.utils.binlog.BinLog.VERSION;
+
public class FQLQueryReader implements ReadMarshallable
{
private FQLQuery query;
public void readMarshallable(WireIn wireIn) throws IORuntimeException
{
- int currentVersion = wireIn.read(VERSION).int16();
- String type = wireIn.read(TYPE).text();
+ verifyVersion(wireIn);
+ String type = readType(wireIn);
+
long queryStartTime = wireIn.read(QUERY_START_TIME).int64();
int protocolVersion = wireIn.read(PROTOCOL_VERSION).int32();
QueryOptions queryOptions =
QueryOptions.codec.decode(Unpooled.wrappedBuffer(wireIn.read(QUERY_OPTIONS).bytes()),
ProtocolVersion.decode(protocolVersion, true));
@@ -105,12 +108,35 @@ public class FQLQueryReader implements ReadMarshallable
values);
break;
default:
- throw new RuntimeException("Unknown type: " + type);
+ throw new IORuntimeException("Unhandled record type: " + type);
+ }
+ }
+
+ private void verifyVersion(WireIn wireIn)
+ {
+ int version = wireIn.read(VERSION).int16();
+
+ if (version > CURRENT_VERSION)
+ {
+ throw new IORuntimeException("Unsupported record version [" +
version
+ + "] - highest supported version is
[" + CURRENT_VERSION + ']');
+ }
+ }
+
+ private String readType(WireIn wireIn) throws IORuntimeException
+ {
+ String type = wireIn.read(TYPE).text();
+ if (!SINGLE_QUERY.equals(type) && !BATCH.equals(type))
+ {
+ throw new IORuntimeException("Unsupported record type field [" +
type
+ + "] - supported record types are ["
+ SINGLE_QUERY + ", " + BATCH + ']');
}
+
+ return type;
}
public FQLQuery getQuery()
{
return query;
}
-}
\ No newline at end of file
+}
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Dump.java
b/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Dump.java
index b3e1f22..46c731f 100644
--- a/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Dump.java
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Dump.java
@@ -31,6 +31,7 @@ import io.airlift.airline.Command;
import io.airlift.airline.Option;
import io.netty.buffer.Unpooled;
import net.openhft.chronicle.bytes.Bytes;
+import net.openhft.chronicle.core.io.IORuntimeException;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ChronicleQueueBuilder;
import net.openhft.chronicle.queue.ExcerptTailer;
@@ -42,6 +43,7 @@ import net.openhft.chronicle.wire.WireIn;
import org.apache.cassandra.audit.FullQueryLogger;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.binlog.BinLog;
/**
* Dump the contents of a list of paths containing full query logs
@@ -73,11 +75,20 @@ public class Dump implements Runnable
{
sb.setLength(0);
- int version = wireIn.read(FullQueryLogger.VERSION).int16();
- if (version != FullQueryLogger.CURRENT_VERSION)
- throw new UnsupportedOperationException("Full query log of
unexpected version " + version + " encountered");
+ int version = wireIn.read(BinLog.VERSION).int16();
+ if (version > FullQueryLogger.CURRENT_VERSION)
+ {
+ throw new IORuntimeException("Unsupported record version [" +
version
+ + "] - highest supported version
is [" + FullQueryLogger.CURRENT_VERSION + ']');
+ }
+
+ String type = wireIn.read(BinLog.TYPE).text();
+ if (!FullQueryLogger.SINGLE_QUERY.equals((type)) &&
!FullQueryLogger.BATCH.equals((type)))
+ {
+ throw new IORuntimeException("Unsupported record type field ["
+ type
+ + "] - supported record types are
[" + FullQueryLogger.SINGLE_QUERY + ", " + FullQueryLogger.BATCH + ']');
+ }
- String type = wireIn.read(FullQueryLogger.TYPE).text();
sb.append("Type: ")
.append(type)
.append(System.lineSeparator());
@@ -117,7 +128,7 @@ public class Dump implements Runnable
break;
default:
- throw new UnsupportedOperationException("Log entry of
unsupported type " + type);
+ throw new IORuntimeException("Log entry of unsupported
type " + type);
}
System.out.print(sb.toString());
diff --git
a/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLReplayTest.java
b/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLReplayTest.java
index 8c573f4..9220c2e 100644
--- a/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLReplayTest.java
+++ b/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLReplayTest.java
@@ -36,10 +36,12 @@ import org.junit.Test;
import com.datastax.driver.core.CodecRegistry;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
+import net.openhft.chronicle.core.io.IORuntimeException;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ChronicleQueueBuilder;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
+import net.openhft.chronicle.wire.WireOut;
import org.apache.cassandra.audit.FullQueryLogger;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.statements.BatchStatement;
@@ -51,6 +53,7 @@ import org.apache.cassandra.tools.Util;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.MergeIterator;
import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.binlog.BinLog;
import static
org.apache.cassandra.fqltool.QueryReplayer.ParsedTargetHost.fromString;
import static org.junit.Assert.assertArrayEquals;
@@ -586,6 +589,88 @@ public class FQLReplayTest
fromString("aaa:[email protected]:xyz");
}
+ @Test (expected = IORuntimeException.class)
+ public void testFutureVersion() throws Exception
+ {
+ FQLQueryReader reader = new FQLQueryReader();
+ File dir = Files.createTempDirectory("chronicle").toFile();
+ try (ChronicleQueue queue = ChronicleQueueBuilder.single(dir).build())
+ {
+ ExcerptAppender appender = queue.acquireAppender();
+ appender.writeDocument(new BinLog.ReleaseableWriteMarshallable() {
+ protected long version()
+ {
+ return 999;
+ }
+
+ protected String type()
+ {
+ return FullQueryLogger.SINGLE_QUERY;
+ }
+
+ public void writeMarshallablePayload(WireOut wire)
+ {
+ wire.write("future-field").text("future_value");
+ }
+
+ public void release()
+ {
+
+ }
+ });
+
+ ExcerptTailer tailer = queue.createTailer();
+ tailer.readDocument(reader);
+ }
+ catch (Exception e)
+ {
+ assertTrue(e.getMessage().contains("Unsupported record version"));
+ throw e;
+ }
+
+ }
+
+ @Test (expected = IORuntimeException.class)
+ public void testUnknownRecord() throws Exception
+ {
+ FQLQueryReader reader = new FQLQueryReader();
+ File dir = Files.createTempDirectory("chronicle").toFile();
+ try (ChronicleQueue queue = ChronicleQueueBuilder.single(dir).build())
+ {
+ ExcerptAppender appender = queue.acquireAppender();
+ appender.writeDocument(new BinLog.ReleaseableWriteMarshallable() {
+ protected long version()
+ {
+ return FullQueryLogger.CURRENT_VERSION;
+ }
+
+ protected String type()
+ {
+ return "unknown-type";
+ }
+
+ public void writeMarshallablePayload(WireOut wire)
+ {
+ wire.write("unknown-field").text("unknown_value");
+ }
+
+ public void release()
+ {
+
+ }
+ });
+
+ ExcerptTailer tailer = queue.createTailer();
+ tailer.readDocument(reader);
+ }
+ catch (Exception e)
+ {
+ assertTrue(e.getMessage().contains("Unsupported record type
field"));
+ throw e;
+ }
+
+ }
+
private void compareStatements(Statement statement1, Statement statement2)
{
assertTrue(statement1 instanceof SimpleStatement && statement2
instanceof SimpleStatement);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]