This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d5e5c45  Align record header of FQL and audit binary log
d5e5c45 is described below

commit d5e5c459f5c4c54c853b5fcfb5c2b3bfeee0d59c
Author: Per Otterström <[email protected]>
AuthorDate: Wed Apr 24 14:41:57 2019 +0200

    Align record header of FQL and audit binary log
    
    Patch by Per Otterström; reviewed by Vinay Chella and marcuse for 
CASSANDRA-15076
---
 CHANGES.txt                                        |   1 +
 conf/cassandra.yaml                                |   5 -
 .../org/apache/cassandra/audit/BinAuditLogger.java |  17 ++-
 .../apache/cassandra/audit/FullQueryLogger.java    |  25 ++--
 .../org/apache/cassandra/tools/AuditLogViewer.java |  67 ++++++++--
 .../org/apache/cassandra/utils/binlog/BinLog.java  |  32 ++++-
 .../apache/cassandra/audit/BinAuditLoggerTest.java |   6 +-
 .../cassandra/audit/FullQueryLoggerTest.java       |  19 ++-
 .../apache/cassandra/tools/AuditLogViewerTest.java | 138 ++++++++++++++++++++-
 .../apache/cassandra/utils/binlog/BinLogTest.java  |  73 +++++++++--
 .../apache/cassandra/fqltool/FQLQueryReader.java   |  38 +++++-
 .../apache/cassandra/fqltool/commands/Dump.java    |  21 +++-
 .../apache/cassandra/fqltool/FQLReplayTest.java    |  85 +++++++++++++
 13 files changed, 458 insertions(+), 69 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 43f8cbe..3d1991a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha3
+ * Align record header of FQL and audit binary log (CASSANDRA-15076)
  * Shuffle forwarding replica for messages to non-local DC (CASSANDRA-15318)
  * Optimise native protocol ASCII string encoding (CASSANDRA-15410)
  * Make sure all exceptions are propagated in DebuggableThreadPoolExecutor 
(CASSANDRA-15332)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index f3e5c75..9a79f24 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1259,11 +1259,6 @@ back_pressure_strategy:
 # each write which may be lower in order to facilitate availability.
 # ideal_consistency_level: EACH_QUORUM
 
-# Path to write full query log data to when the full query log is enabled
-# The full query log will recrusively delete the contents of this path at
-# times. Don't place links in this directory to other parts of the filesystem.
-#full_query_log_dir: /tmp/cassandrafullquerylog
-
 # Automatically upgrade sstables after upgrade - if there is no ordinary 
compaction to do, the
 # oldest non-upgraded sstable will get upgraded to the latest version
 # automatic_sstable_upgrade: false
diff --git a/src/java/org/apache/cassandra/audit/BinAuditLogger.java 
b/src/java/org/apache/cassandra/audit/BinAuditLogger.java
index 23b9977..83ed3de 100644
--- a/src/java/org/apache/cassandra/audit/BinAuditLogger.java
+++ b/src/java/org/apache/cassandra/audit/BinAuditLogger.java
@@ -30,8 +30,8 @@ import org.apache.cassandra.utils.concurrent.WeightedQueue;
 
 public class BinAuditLogger extends BinLogAuditLogger implements IAuditLogger
 {
-    public static final String TYPE = "type";
-    public static final String AUDITLOG_TYPE = "AuditLog";
+    public static final long CURRENT_VERSION = 0;
+    public static final String AUDITLOG_TYPE = "audit";
     public static final String AUDITLOG_MESSAGE = "message";
 
     public BinAuditLogger()
@@ -71,10 +71,19 @@ public class BinAuditLogger extends BinLogAuditLogger 
implements IAuditLogger
             this.message = message;
         }
 
+        protected long version()
+        {
+            return CURRENT_VERSION;
+        }
+
+        protected String type()
+        {
+            return AUDITLOG_TYPE;
+        }
+
         @Override
-        public void writeMarshallable(WireOut wire)
+        public void writeMarshallablePayload(WireOut wire)
         {
-            wire.write(TYPE).text(AUDITLOG_TYPE);
             wire.write(AUDITLOG_MESSAGE).text(message);
         }
 
diff --git a/src/java/org/apache/cassandra/audit/FullQueryLogger.java 
b/src/java/org/apache/cassandra/audit/FullQueryLogger.java
index 9c1f472..7982940 100644
--- a/src/java/org/apache/cassandra/audit/FullQueryLogger.java
+++ b/src/java/org/apache/cassandra/audit/FullQueryLogger.java
@@ -23,7 +23,6 @@ import java.util.List;
 
 import javax.annotation.Nullable;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.Ints;
 
@@ -45,10 +44,7 @@ import org.github.jamm.MemoryLayoutSpecification;
  */
 public class FullQueryLogger extends BinLogAuditLogger implements IAuditLogger
 {
-    public static final long CURRENT_VERSION = 0; // encode a dummy version, 
to prevent pain in decoding in the future
-
-    public static final String VERSION = "version";
-    public static final String TYPE = "type";
+    public static final long CURRENT_VERSION = 0;
 
     public static final String PROTOCOL_VERSION = "protocol-version";
     public static final String QUERY_OPTIONS = "query-options";
@@ -169,9 +165,9 @@ public class FullQueryLogger extends BinLogAuditLogger 
implements IAuditLogger
         }
 
         @Override
-        public void writeMarshallable(WireOut wire)
+        public void writeMarshallablePayload(WireOut wire)
         {
-            super.writeMarshallable(wire);
+            super.writeMarshallablePayload(wire);
             wire.write(QUERY).text(query);
         }
 
@@ -229,9 +225,9 @@ public class FullQueryLogger extends BinLogAuditLogger 
implements IAuditLogger
         }
 
         @Override
-        public void writeMarshallable(WireOut wire)
+        public void writeMarshallablePayload(WireOut wire)
         {
-            super.writeMarshallable(wire);
+            super.writeMarshallablePayload(wire);
             wire.write(BATCH_TYPE).text(batchType.name());
             ValueOut valueOut = wire.write(QUERIES);
             valueOut.int32(queries.size());
@@ -305,11 +301,14 @@ public class FullQueryLogger extends BinLogAuditLogger 
implements IAuditLogger
         }
 
         @Override
-        public void writeMarshallable(WireOut wire)
+        protected long version()
         {
-            wire.write(VERSION).int16(CURRENT_VERSION);
-            wire.write(TYPE).text(type());
+            return CURRENT_VERSION;
+        }
 
+        @Override
+        public void writeMarshallablePayload(WireOut wire)
+        {
             wire.write(QUERY_START_TIME).int64(queryStartTime);
             wire.write(PROTOCOL_VERSION).int32(protocolVersion);
             
wire.write(QUERY_OPTIONS).bytes(BytesStore.wrap(queryOptionsBuffer.nioBuffer()));
@@ -339,8 +338,6 @@ public class FullQueryLogger extends BinLogAuditLogger 
implements IAuditLogger
                     ? Ints.checkedCast(ObjectSizes.sizeOf(keyspace))  // 
keyspace
                     : OBJECT_REFERENCE_SIZE);                         // null
         }
-
-        protected abstract String type();
     }
 
 }
diff --git a/src/java/org/apache/cassandra/tools/AuditLogViewer.java 
b/src/java/org/apache/cassandra/tools/AuditLogViewer.java
index 01ea7b3..f1a6e37 100644
--- a/src/java/org/apache/cassandra/tools/AuditLogViewer.java
+++ b/src/java/org/apache/cassandra/tools/AuditLogViewer.java
@@ -40,6 +40,7 @@ import net.openhft.chronicle.threads.Pauser;
 import net.openhft.chronicle.wire.ReadMarshallable;
 import net.openhft.chronicle.wire.WireIn;
 import org.apache.cassandra.audit.BinAuditLogger;
+import org.apache.cassandra.utils.binlog.BinLog;
 
 /**
  * Tool to view the contenst of AuditLog files in human readable format. 
Default implementation for AuditLog files
@@ -51,6 +52,7 @@ public class AuditLogViewer
     private static final String TOOL_NAME = "auditlogviewer";
     private static final String ROLL_CYCLE = "roll_cycle";
     private static final String FOLLOW = "follow";
+    private static final String IGNORE = "ignore";
     private static final String HELP_OPTION = "help";
 
     public static void main(String[] args)
@@ -59,7 +61,7 @@ public class AuditLogViewer
 
         try
         {
-            dump(options.pathList, options.rollCycle, options.follow, 
System.out::print);
+            dump(options.pathList, options.rollCycle, options.follow, 
options.ignoreUnsupported, System.out::print);
         }
         catch (Exception e)
         {
@@ -68,7 +70,7 @@ public class AuditLogViewer
         }
     }
 
-    static void dump(List<String> pathList, String rollCycle, boolean follow, 
Consumer<String> displayFun)
+    static void dump(List<String> pathList, String rollCycle, boolean follow, 
boolean ignoreUnsupported, Consumer<String> displayFun)
     {
         //Backoff strategy for spinning on the queue, not aggressive at all as 
this doesn't need to be low latency
         Pauser pauser = Pauser.millis(100);
@@ -83,7 +85,7 @@ public class AuditLogViewer
             hadWork = false;
             for (ExcerptTailer tailer : tailers)
             {
-                while (tailer.readDocument(new DisplayRecord(displayFun)))
+                while (tailer.readDocument(new 
DisplayRecord(ignoreUnsupported, displayFun)))
                 {
                     hadWork = true;
                 }
@@ -104,28 +106,69 @@ public class AuditLogViewer
 
     private static class DisplayRecord implements ReadMarshallable
     {
+        private final boolean ignoreUnsupported;
         private final Consumer<String> displayFun;
 
-        DisplayRecord(Consumer<String> displayFun)
+        DisplayRecord(boolean ignoreUnsupported, Consumer<String> displayFun)
         {
+            this.ignoreUnsupported = ignoreUnsupported;
             this.displayFun = displayFun;
         }
 
         public void readMarshallable(WireIn wireIn) throws IORuntimeException
         {
-            StringBuilder sb = new StringBuilder();
+            int version = wireIn.read(BinLog.VERSION).int16();
+            if (!isSupportedVersion(version))
+            {
+                return;
+            }
+            String type = wireIn.read(BinLog.TYPE).text();
+            if (!isSupportedType(type))
+            {
+                return;
+            }
 
-            String type = wireIn.read(BinAuditLogger.TYPE).text();
+            StringBuilder sb = new StringBuilder();
             sb.append("Type: ")
               .append(type)
+              .append(System.lineSeparator())
+              .append("LogMessage: ")
+              .append(wireIn.read(BinAuditLogger.AUDITLOG_MESSAGE).text())
               .append(System.lineSeparator());
 
-            if (null != type && type.equals(BinAuditLogger.AUDITLOG_TYPE))
+            displayFun.accept(sb.toString());
+        }
+
+        private boolean isSupportedVersion(int version)
+        {
+            if (version <= BinAuditLogger.CURRENT_VERSION)
             {
-                sb.append("LogMessage: 
").append(wireIn.read(BinAuditLogger.AUDITLOG_MESSAGE).text()).append(System.lineSeparator());
+                return true;
             }
 
-            displayFun.accept(sb.toString());
+            if (ignoreUnsupported)
+            {
+                return false;
+            }
+
+            throw new IORuntimeException("Unsupported record version [" + 
version
+                                         + "] - highest supported version is 
[" + BinAuditLogger.CURRENT_VERSION + ']');
+        }
+
+        private boolean isSupportedType(String type)
+        {
+            if (BinAuditLogger.AUDITLOG_TYPE.equals(type))
+            {
+                return true;
+            }
+
+            if (ignoreUnsupported)
+            {
+                return false;
+            }
+
+            throw new IORuntimeException("Unsupported record type field [" + 
type
+                                         + "] - supported type is [" + 
BinAuditLogger.AUDITLOG_TYPE + ']');
         }
     }
 
@@ -134,6 +177,7 @@ public class AuditLogViewer
         private final List<String> pathList;
         private String rollCycle = "HOURLY";
         private boolean follow;
+        private boolean ignoreUnsupported;
 
         private AuditLogViewerOptions(String[] pathList)
         {
@@ -166,6 +210,8 @@ public class AuditLogViewer
 
                 opts.follow = cmd.hasOption(FOLLOW);
 
+                opts.ignoreUnsupported = cmd.hasOption(IGNORE);
+
                 if (cmd.hasOption(ROLL_CYCLE))
                 {
                     opts.rollCycle = cmd.getOptionValue(ROLL_CYCLE);
@@ -191,8 +237,9 @@ public class AuditLogViewer
         {
             Options options = new Options();
 
-            options.addOption(new Option("r", ROLL_CYCLE, false, "How often to 
roll the log file was rolled. May be necessary for Chronicle to correctly parse 
file names. (MINUTELY, HOURLY, DAILY). Default HOURLY."));
+            options.addOption(new Option("r", ROLL_CYCLE, true, "How often to 
roll the log file was rolled. May be necessary for Chronicle to correctly parse 
file names. (MINUTELY, HOURLY, DAILY). Default HOURLY."));
             options.addOption(new Option("f", FOLLOW, false, "Upon reacahing 
the end of the log continue indefinitely waiting for more records"));
+            options.addOption(new Option("i", IGNORE, false, "Silently ignore 
unsupported records"));
             options.addOption(new Option("h", HELP_OPTION, false, "display 
this help message"));
 
             return options;
diff --git a/src/java/org/apache/cassandra/utils/binlog/BinLog.java 
b/src/java/org/apache/cassandra/utils/binlog/BinLog.java
index d4dac78..0fce6ee 100644
--- a/src/java/org/apache/cassandra/utils/binlog/BinLog.java
+++ b/src/java/org/apache/cassandra/utils/binlog/BinLog.java
@@ -53,6 +53,9 @@ public class BinLog implements Runnable
 {
     private static final Logger logger = LoggerFactory.getLogger(BinLog.class);
 
+    public static final String VERSION = "version";
+    public static final String TYPE = "type";
+
     private ChronicleQueue queue;
     private ExcerptAppender appender;
     @VisibleForTesting
@@ -63,7 +66,19 @@ public class BinLog implements Runnable
     private static final ReleaseableWriteMarshallable NO_OP = new 
ReleaseableWriteMarshallable()
     {
         @Override
-        public void writeMarshallable(WireOut wire)
+        protected long version()
+        {
+            return 0;
+        }
+
+        @Override
+        protected String type()
+        {
+            return "no-op";
+        }
+
+        @Override
+        public void writeMarshallablePayload(WireOut wire)
         {
         }
 
@@ -229,6 +244,21 @@ public class BinLog implements Runnable
 
     public abstract static class ReleaseableWriteMarshallable implements 
WriteMarshallable
     {
+        @Override
+        public final void writeMarshallable(WireOut wire)
+        {
+            wire.write(VERSION).int16(version());
+            wire.write(TYPE).text(type());
+
+            writeMarshallablePayload(wire);
+        }
+
+        protected abstract long version();
+
+        protected abstract String type();
+
+        protected abstract void writeMarshallablePayload(WireOut wire);
+
         public abstract void release();
     }
 }
diff --git a/test/unit/org/apache/cassandra/audit/BinAuditLoggerTest.java 
b/test/unit/org/apache/cassandra/audit/BinAuditLoggerTest.java
index f9d2930..90c9325 100644
--- a/test/unit/org/apache/cassandra/audit/BinAuditLoggerTest.java
+++ b/test/unit/org/apache/cassandra/audit/BinAuditLoggerTest.java
@@ -76,12 +76,14 @@ public class BinAuditLoggerTest extends CQLTester
         {
             ExcerptTailer tailer = queue.createTailer();
             assertTrue(tailer.readDocument(wire -> {
-                assertEquals("AuditLog", wire.read("type").text());
+                assertEquals(0L, wire.read("version").int16());
+                assertEquals("audit", wire.read("type").text());
                 assertThat(wire.read("message").text(), 
containsString(AuditLogEntryType.PREPARE_STATEMENT.toString()));
             }));
 
             assertTrue(tailer.readDocument(wire -> {
-                assertEquals("AuditLog", wire.read("type").text());
+                assertEquals(0L, wire.read("version").int16());
+                assertEquals("audit", wire.read("type").text());
                 assertThat(wire.read("message").text(), 
containsString(AuditLogEntryType.SELECT.toString()));
             }));
             assertFalse(tailer.readDocument(wire -> {
diff --git a/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java 
b/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java
index 525fa8e..62e5a50 100644
--- a/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java
+++ b/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java
@@ -55,11 +55,9 @@ import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.binlog.BinLogTest;
-import org.apache.cassandra.utils.binlog.DeletingArchiver;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import static org.apache.cassandra.audit.FullQueryLogger.BATCH;
@@ -72,9 +70,10 @@ import static 
org.apache.cassandra.audit.FullQueryLogger.QUERY;
 import static org.apache.cassandra.audit.FullQueryLogger.QUERY_OPTIONS;
 import static org.apache.cassandra.audit.FullQueryLogger.QUERY_START_TIME;
 import static org.apache.cassandra.audit.FullQueryLogger.SINGLE_QUERY;
-import static org.apache.cassandra.audit.FullQueryLogger.TYPE;
 import static org.apache.cassandra.audit.FullQueryLogger.VALUES;
-import static org.apache.cassandra.audit.FullQueryLogger.VERSION;
+
+import static org.apache.cassandra.utils.binlog.BinLog.TYPE;
+import static org.apache.cassandra.utils.binlog.BinLog.VERSION;
 
 public class FullQueryLoggerTest extends CQLTester
 {
@@ -274,7 +273,7 @@ public class FullQueryLoggerTest extends CQLTester
             instance.binLog.put(new Query("foo1", QueryOptions.DEFAULT, 
queryState(), 1)
             {
 
-                public void writeMarshallable(WireOut wire)
+                public void writeMarshallablePayload(WireOut wire)
                 {
                     //Notify that the bin log is blocking now
                     binLogBlocked.release();
@@ -287,7 +286,7 @@ public class FullQueryLoggerTest extends CQLTester
                     {
                         e.printStackTrace();
                     }
-                    super.writeMarshallable(wire);
+                    super.writeMarshallablePayload(wire);
                 }
 
                 public void release()
@@ -356,7 +355,7 @@ public class FullQueryLoggerTest extends CQLTester
             instance.binLog.put(new Query("foo1", QueryOptions.DEFAULT, 
queryState(), 1)
             {
 
-                public void writeMarshallable(WireOut wire)
+                public void writeMarshallablePayload(WireOut wire)
                 {
                     //Notify that the bin log is blocking now
                     binLogBlocked.release();
@@ -369,7 +368,7 @@ public class FullQueryLoggerTest extends CQLTester
                     {
                         e.printStackTrace();
                     }
-                    super.writeMarshallable(wire);
+                    super.writeMarshallablePayload(wire);
                 }
 
                 public void release()
@@ -388,10 +387,10 @@ public class FullQueryLoggerTest extends CQLTester
             AtomicInteger releasedCount = new AtomicInteger(0);
             AtomicInteger writtenCount = new AtomicInteger(0);
             instance.logRecord(new Query("foo3", QueryOptions.DEFAULT, 
queryState(), 1) {
-                public void writeMarshallable(WireOut wire)
+                public void writeMarshallablePayload(WireOut wire)
                 {
                     writtenCount.incrementAndGet();
-                    super.writeMarshallable(wire);
+                    super.writeMarshallablePayload(wire);
                 }
 
                 public void release()
diff --git a/test/unit/org/apache/cassandra/tools/AuditLogViewerTest.java 
b/test/unit/org/apache/cassandra/tools/AuditLogViewerTest.java
index 078f899..649712a 100644
--- a/test/unit/org/apache/cassandra/tools/AuditLogViewerTest.java
+++ b/test/unit/org/apache/cassandra/tools/AuditLogViewerTest.java
@@ -31,12 +31,16 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import net.openhft.chronicle.core.io.IORuntimeException;
 import net.openhft.chronicle.queue.ChronicleQueue;
 import net.openhft.chronicle.queue.ChronicleQueueBuilder;
 import net.openhft.chronicle.queue.ExcerptAppender;
 import net.openhft.chronicle.queue.RollCycles;
+import net.openhft.chronicle.wire.WireOut;
 import org.apache.cassandra.audit.BinAuditLogger;
 
+import static org.junit.Assert.assertTrue;
+
 public class AuditLogViewerTest
 {
     private Path path;
@@ -73,12 +77,138 @@ public class AuditLogViewerTest
 
             //Read those written records
             List<String> actualRecords = new ArrayList<>();
-            AuditLogViewer.dump(ImmutableList.of(path.toString()), 
RollCycles.TEST_SECONDLY.toString(), false, actualRecords::add);
+            AuditLogViewer.dump(ImmutableList.of(path.toString()), 
RollCycles.TEST_SECONDLY.toString(), false, false, actualRecords::add);
+
+            assertRecordsMatch(records, actualRecords);
+        }
+    }
+
+    @Test (expected = IORuntimeException.class)
+    public void testRejectFutureVersionRecord()
+    {
+        try (ChronicleQueue queue = 
ChronicleQueueBuilder.single(path.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
+        {
+            ExcerptAppender appender = queue.acquireAppender();
+            appender.writeDocument(createFutureRecord());
+
+            AuditLogViewer.dump(ImmutableList.of(path.toString()), 
RollCycles.TEST_SECONDLY.toString(), false, false, dummy -> {});
+        }
+        catch (Exception e)
+        {
+            assertTrue(e.getMessage().contains("Unsupported record version"));
+            throw e;
+        }
+    }
+
+    @Test
+    public void testIgnoreFutureVersionRecord()
+    {
+        List<String> records = new ArrayList<>();
+        records.add("Test foo bar 1");
+        records.add("Test foo bar 2");
+
+        try (ChronicleQueue queue = 
ChronicleQueueBuilder.single(path.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
+        {
+            ExcerptAppender appender = queue.acquireAppender();
+
+            //Write future record
+            appender.writeDocument(createFutureRecord());
+
+            //Write bunch of current records
+            records.forEach(s -> appender.writeDocument(new 
BinAuditLogger.Message(s)));
+
+            //Read those written records
+            List<String> actualRecords = new ArrayList<>();
+            AuditLogViewer.dump(ImmutableList.of(path.toString()), 
RollCycles.TEST_SECONDLY.toString(), false, true, actualRecords::add);
+
+            // Assert all current records are present
+            assertRecordsMatch(records, actualRecords);
+        }
+    }
+
+    @Test (expected = IORuntimeException.class)
+    public void testRejectUnknownTypeRecord()
+    {
+        try (ChronicleQueue queue = 
ChronicleQueueBuilder.single(path.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
+        {
+            ExcerptAppender appender = queue.acquireAppender();
+            appender.writeDocument(createUnknownTypeRecord());
+
+            AuditLogViewer.dump(ImmutableList.of(path.toString()), 
RollCycles.TEST_SECONDLY.toString(), false, false, dummy -> {});
+        }
+        catch (Exception e)
+        {
+            assertTrue(e.getMessage().contains("Unsupported record type 
field"));
+            throw e;
+        }
+    }
+
+    @Test
+    public void testIgnoreUnknownTypeRecord()
+    {
+        List<String> records = new ArrayList<>();
+        records.add("Test foo bar 1");
+        records.add("Test foo bar 2");
+
+        try (ChronicleQueue queue = 
ChronicleQueueBuilder.single(path.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
+        {
+            ExcerptAppender appender = queue.acquireAppender();
+
+            //Write unrecognized type record
+            appender.writeDocument(createUnknownTypeRecord());
+
+            //Write bunch of supported records
+            records.forEach(s -> appender.writeDocument(new 
BinAuditLogger.Message(s)));
+
+            //Read those written records
+            List<String> actualRecords = new ArrayList<>();
+            AuditLogViewer.dump(ImmutableList.of(path.toString()), 
RollCycles.TEST_SECONDLY.toString(), false, true, actualRecords::add);
 
-            for (int i = 0; i < records.size(); i++)
+            // Assert all supported records are present
+            assertRecordsMatch(records, actualRecords);
+        }
+    }
+
+    private BinAuditLogger.Message createFutureRecord()
+    {
+        return new BinAuditLogger.Message("dummy message") {
+            protected long version()
+            {
+                return 999;
+            }
+
+            @Override
+            public void writeMarshallablePayload(WireOut wire)
             {
-                
Assert.assertTrue(actualRecords.get(i).contains(records.get(i)));
+                super.writeMarshallablePayload(wire);
+                wire.write("future-field").text("future_value");
             }
+        };
+    }
+
+    private BinAuditLogger.Message createUnknownTypeRecord()
+    {
+        return new BinAuditLogger.Message("dummy message") {
+            protected String type()
+            {
+                return "unknown-type";
+            }
+
+            @Override
+            public void writeMarshallablePayload(WireOut wire)
+            {
+                super.writeMarshallablePayload(wire);
+                wire.write("unknown-field").text("unknown_value");
+            }
+        };
+    }
+
+    private void assertRecordsMatch(List<String> records, List<String> 
actualRecords)
+    {
+        Assert.assertEquals(records.size(), actualRecords.size());
+        for (int i = 0; i < records.size(); i++)
+        {
+            Assert.assertTrue(actualRecords.get(i).contains(records.get(i)));
         }
     }
-}
\ No newline at end of file
+}
diff --git a/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java 
b/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java
index 76c42f2..81f1ea0 100644
--- a/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java
+++ b/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java
@@ -36,8 +36,6 @@ import net.openhft.chronicle.queue.ExcerptTailer;
 import net.openhft.chronicle.queue.RollCycles;
 import net.openhft.chronicle.wire.WireOut;
 import org.apache.cassandra.Util;
-import org.apache.cassandra.audit.AuditLogOptions;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.util.FileUtils;
 
 import static org.junit.Assert.assertEquals;
@@ -122,7 +120,17 @@ public class BinLogTest
                 releaseCount.incrementAndGet();
             }
 
-            public void writeMarshallable(WireOut wire)
+            protected long version()
+            {
+                return 0;
+            }
+
+            protected String type()
+            {
+                return "test";
+            }
+
+            public void writeMarshallablePayload(WireOut wire)
             {
                 try
                 {
@@ -136,8 +144,17 @@ public class BinLogTest
         });
         binLog.put(new BinLog.ReleaseableWriteMarshallable()
         {
+            protected long version()
+            {
+                return 0;
+            }
+
+            protected String type()
+            {
+                return "test";
+            }
 
-            public void writeMarshallable(WireOut wire)
+            public void writeMarshallablePayload(WireOut wire)
             {
 
             }
@@ -182,7 +199,17 @@ public class BinLogTest
                 released.release();
             }
 
-            public void writeMarshallable(WireOut wire)
+            protected long version()
+            {
+                return 0;
+            }
+
+            protected String type()
+            {
+                return "test";
+            }
+
+            public void writeMarshallablePayload(WireOut wire)
             {
 
             }
@@ -227,7 +254,17 @@ public class BinLogTest
                 {
                 }
 
-                public void writeMarshallable(WireOut wire)
+                protected long version()
+                {
+                    return 0;
+                }
+
+                protected String type()
+                {
+                    return "test";
+                }
+
+                public void writeMarshallablePayload(WireOut wire)
                 {
                     //Notify the bing log thread is about to block
                     binLogBlocked.release();
@@ -303,7 +340,17 @@ public class BinLogTest
                 {
                 }
 
-                public void writeMarshallable(WireOut wire)
+                protected long version()
+                {
+                    return 0;
+                }
+
+                protected String type()
+                {
+                    return "test";
+                }
+
+                public void writeMarshallablePayload(WireOut wire)
                 {
                     //Notify the bing log thread is about to block
                     binLogBlocked.release();
@@ -420,7 +467,17 @@ public class BinLogTest
                 //Do nothing
             }
 
-            public void writeMarshallable(WireOut wire)
+            protected long version()
+            {
+                return 0;
+            }
+
+            protected String type()
+            {
+                return "test";
+            }
+
+            public void writeMarshallablePayload(WireOut wire)
             {
                 wire.write("text").text(text);
             }
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQueryReader.java 
b/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQueryReader.java
index 0c18b79..164b077 100644
--- a/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQueryReader.java
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQueryReader.java
@@ -32,14 +32,13 @@ import net.openhft.chronicle.wire.WireIn;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.transport.ProtocolVersion;
 
+import static org.apache.cassandra.audit.FullQueryLogger.CURRENT_VERSION;
 import static 
org.apache.cassandra.audit.FullQueryLogger.GENERATED_NOW_IN_SECONDS;
 import static org.apache.cassandra.audit.FullQueryLogger.GENERATED_TIMESTAMP;
 import static org.apache.cassandra.audit.FullQueryLogger.KEYSPACE;
 import static org.apache.cassandra.audit.FullQueryLogger.PROTOCOL_VERSION;
 import static org.apache.cassandra.audit.FullQueryLogger.QUERY_OPTIONS;
 import static org.apache.cassandra.audit.FullQueryLogger.QUERY_START_TIME;
-import static org.apache.cassandra.audit.FullQueryLogger.TYPE;
-import static org.apache.cassandra.audit.FullQueryLogger.VERSION;
 import static org.apache.cassandra.audit.FullQueryLogger.BATCH;
 import static org.apache.cassandra.audit.FullQueryLogger.BATCH_TYPE;
 import static org.apache.cassandra.audit.FullQueryLogger.QUERIES;
@@ -47,14 +46,18 @@ import static 
org.apache.cassandra.audit.FullQueryLogger.QUERY;
 import static org.apache.cassandra.audit.FullQueryLogger.SINGLE_QUERY;
 import static org.apache.cassandra.audit.FullQueryLogger.VALUES;
 
+import static org.apache.cassandra.utils.binlog.BinLog.TYPE;
+import static org.apache.cassandra.utils.binlog.BinLog.VERSION;
+
 public class FQLQueryReader implements ReadMarshallable
 {
     private FQLQuery query;
 
     public void readMarshallable(WireIn wireIn) throws IORuntimeException
     {
-        int currentVersion = wireIn.read(VERSION).int16();
-        String type = wireIn.read(TYPE).text();
+        verifyVersion(wireIn);
+        String type = readType(wireIn);
+
         long queryStartTime = wireIn.read(QUERY_START_TIME).int64();
         int protocolVersion = wireIn.read(PROTOCOL_VERSION).int32();
         QueryOptions queryOptions = 
QueryOptions.codec.decode(Unpooled.wrappedBuffer(wireIn.read(QUERY_OPTIONS).bytes()),
 ProtocolVersion.decode(protocolVersion, true));
@@ -105,12 +108,35 @@ public class FQLQueryReader implements ReadMarshallable
                                            values);
                 break;
             default:
-                throw new RuntimeException("Unknown type: " + type);
+                throw new IORuntimeException("Unhandled record type: " + type);
+        }
+    }
+
+    private void verifyVersion(WireIn wireIn)
+    {
+        int version = wireIn.read(VERSION).int16();
+
+        if (version > CURRENT_VERSION)
+        {
+            throw new IORuntimeException("Unsupported record version [" + 
version
+                                         + "] - highest supported version is 
[" + CURRENT_VERSION + ']');
+        }
+    }
+
+    private String readType(WireIn wireIn) throws IORuntimeException
+    {
+        String type = wireIn.read(TYPE).text();
+        if (!SINGLE_QUERY.equals(type) && !BATCH.equals(type))
+        {
+            throw new IORuntimeException("Unsupported record type field [" + 
type
+                                         + "] - supported record types are [" 
+ SINGLE_QUERY + ", " + BATCH + ']');
         }
+
+        return type;
     }
 
     public FQLQuery getQuery()
     {
         return query;
     }
-}
\ No newline at end of file
+}
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Dump.java 
b/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Dump.java
index b3e1f22..46c731f 100644
--- a/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Dump.java
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Dump.java
@@ -31,6 +31,7 @@ import io.airlift.airline.Command;
 import io.airlift.airline.Option;
 import io.netty.buffer.Unpooled;
 import net.openhft.chronicle.bytes.Bytes;
+import net.openhft.chronicle.core.io.IORuntimeException;
 import net.openhft.chronicle.queue.ChronicleQueue;
 import net.openhft.chronicle.queue.ChronicleQueueBuilder;
 import net.openhft.chronicle.queue.ExcerptTailer;
@@ -42,6 +43,7 @@ import net.openhft.chronicle.wire.WireIn;
 import org.apache.cassandra.audit.FullQueryLogger;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.binlog.BinLog;
 
 /**
  * Dump the contents of a list of paths containing full query logs
@@ -73,11 +75,20 @@ public class Dump implements Runnable
         {
             sb.setLength(0);
 
-            int version = wireIn.read(FullQueryLogger.VERSION).int16();
-            if (version != FullQueryLogger.CURRENT_VERSION)
-                throw new UnsupportedOperationException("Full query log of 
unexpected version " + version + " encountered");
+            int version = wireIn.read(BinLog.VERSION).int16();
+            if (version > FullQueryLogger.CURRENT_VERSION)
+            {
+                throw new IORuntimeException("Unsupported record version [" + 
version
+                                             + "] - highest supported version 
is [" + FullQueryLogger.CURRENT_VERSION + ']');
+            }
+
+            String type = wireIn.read(BinLog.TYPE).text();
+            if (!FullQueryLogger.SINGLE_QUERY.equals((type)) && 
!FullQueryLogger.BATCH.equals((type)))
+            {
+                throw new IORuntimeException("Unsupported record type field [" 
+ type
+                                             + "] - supported record types are 
[" + FullQueryLogger.SINGLE_QUERY + ", " + FullQueryLogger.BATCH + ']');
+            }
 
-            String type = wireIn.read(FullQueryLogger.TYPE).text();
             sb.append("Type: ")
               .append(type)
               .append(System.lineSeparator());
@@ -117,7 +128,7 @@ public class Dump implements Runnable
                     break;
 
                 default:
-                    throw new UnsupportedOperationException("Log entry of 
unsupported type " + type);
+                    throw new IORuntimeException("Log entry of unsupported 
type " + type);
             }
 
             System.out.print(sb.toString());
diff --git 
a/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLReplayTest.java 
b/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLReplayTest.java
index 8c573f4..9220c2e 100644
--- a/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLReplayTest.java
+++ b/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLReplayTest.java
@@ -36,10 +36,12 @@ import org.junit.Test;
 import com.datastax.driver.core.CodecRegistry;
 import com.datastax.driver.core.SimpleStatement;
 import com.datastax.driver.core.Statement;
+import net.openhft.chronicle.core.io.IORuntimeException;
 import net.openhft.chronicle.queue.ChronicleQueue;
 import net.openhft.chronicle.queue.ChronicleQueueBuilder;
 import net.openhft.chronicle.queue.ExcerptAppender;
 import net.openhft.chronicle.queue.ExcerptTailer;
+import net.openhft.chronicle.wire.WireOut;
 import org.apache.cassandra.audit.FullQueryLogger;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.statements.BatchStatement;
@@ -51,6 +53,7 @@ import org.apache.cassandra.tools.Util;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.MergeIterator;
 import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.binlog.BinLog;
 
 import static 
org.apache.cassandra.fqltool.QueryReplayer.ParsedTargetHost.fromString;
 import static org.junit.Assert.assertArrayEquals;
@@ -586,6 +589,88 @@ public class FQLReplayTest
         fromString("aaa:[email protected]:xyz");
     }
 
+    @Test (expected = IORuntimeException.class)
+    public void testFutureVersion() throws Exception
+    {
+        FQLQueryReader reader = new FQLQueryReader();
+        File dir = Files.createTempDirectory("chronicle").toFile();
+        try (ChronicleQueue queue = ChronicleQueueBuilder.single(dir).build())
+        {
+            ExcerptAppender appender = queue.acquireAppender();
+            appender.writeDocument(new BinLog.ReleaseableWriteMarshallable() {
+                protected long version()
+                {
+                    return 999;
+                }
+
+                protected String type()
+                {
+                    return FullQueryLogger.SINGLE_QUERY;
+                }
+
+                public void writeMarshallablePayload(WireOut wire)
+                {
+                    wire.write("future-field").text("future_value");
+                }
+
+                public void release()
+                {
+
+                }
+            });
+
+            ExcerptTailer tailer = queue.createTailer();
+            tailer.readDocument(reader);
+        }
+        catch (Exception e)
+        {
+            assertTrue(e.getMessage().contains("Unsupported record version"));
+            throw e;
+        }
+
+    }
+
+    @Test (expected = IORuntimeException.class)
+    public void testUnknownRecord() throws Exception
+    {
+        FQLQueryReader reader = new FQLQueryReader();
+        File dir = Files.createTempDirectory("chronicle").toFile();
+        try (ChronicleQueue queue = ChronicleQueueBuilder.single(dir).build())
+        {
+            ExcerptAppender appender = queue.acquireAppender();
+            appender.writeDocument(new BinLog.ReleaseableWriteMarshallable() {
+                protected long version()
+                {
+                    return FullQueryLogger.CURRENT_VERSION;
+                }
+
+                protected String type()
+                {
+                    return "unknown-type";
+                }
+
+                public void writeMarshallablePayload(WireOut wire)
+                {
+                    wire.write("unknown-field").text("unknown_value");
+                }
+
+                public void release()
+                {
+
+                }
+            });
+
+            ExcerptTailer tailer = queue.createTailer();
+            tailer.readDocument(reader);
+        }
+        catch (Exception e)
+        {
+            assertTrue(e.getMessage().contains("Unsupported record type 
field"));
+            throw e;
+        }
+
+    }
+
     private void compareStatements(Statement statement1, Statement statement2)
     {
         assertTrue(statement1 instanceof SimpleStatement && statement2 
instanceof SimpleStatement);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to