Repository: cassandra Updated Branches: refs/heads/trunk a05785d82 -> 6e00ab956
Allow using custom script for chronicle queue BinLog archival Patch by Pramod K Sivaraju and marcuse; reviewed by Ariel Weisberg and Sam Tunnicliffe for CASSANDRA-14373 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6e00ab95 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6e00ab95 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6e00ab95 Branch: refs/heads/trunk Commit: 6e00ab956eb0148a74e926666862e4cc78936301 Parents: a05785d Author: pksivar <[email protected]> Authored: Fri Aug 10 11:55:54 2018 -0700 Committer: Marcus Eriksson <[email protected]> Committed: Thu Oct 11 13:06:34 2018 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + bin/nodetool | 25 +- conf/cassandra.yaml | 22 +- .../apache/cassandra/audit/AuditLogManager.java | 4 +- .../apache/cassandra/audit/AuditLogOptions.java | 45 ++-- .../apache/cassandra/audit/BinAuditLogger.java | 4 +- .../cassandra/audit/BinLogAuditLogger.java | 21 +- .../cassandra/audit/FullQueryLoggerOptions.java | 40 +++ .../org/apache/cassandra/config/Config.java | 4 +- .../cassandra/config/DatabaseDescriptor.java | 5 +- .../apache/cassandra/service/StorageProxy.java | 21 +- .../cassandra/service/StorageProxyMBean.java | 7 +- .../cassandra/service/StorageService.java | 7 +- .../tools/nodetool/EnableFullQueryLog.java | 29 +- .../apache/cassandra/utils/binlog/BinLog.java | 63 +---- .../cassandra/utils/binlog/BinLogArchiver.java | 29 ++ .../cassandra/utils/binlog/BinLogOptions.java | 53 ++++ .../utils/binlog/DeletingArchiver.java | 88 ++++++ .../utils/binlog/ExternalArchiver.java | 206 ++++++++++++++ .../cassandra/audit/FullQueryLoggerTest.java | 18 +- .../config/DatabaseDescriptorRefTest.java | 2 + .../cassandra/utils/binlog/BinLogTest.java | 14 +- .../utils/binlog/DeletingArchiverTest.java | 107 ++++++++ .../utils/binlog/ExternalArchiverTest.java | 268 +++++++++++++++++++ 24 files changed, 953 insertions(+), 130 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e00ab95/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e1fbb90..0bfc8c7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Allow using custom script for chronicle queue BinLog archival (CASSANDRA-14373) * Transient->Full range movements mishandle consistency level upgrade (CASSANDRA-14759) * ReplicaCollection follow-up (CASSANDRA-14726) * Transient node receives full data requests (CASSANDRA-14762) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e00ab95/bin/nodetool ---------------------------------------------------------------------- diff --git a/bin/nodetool b/bin/nodetool index 3104582..7ba57b7 100755 --- a/bin/nodetool +++ b/bin/nodetool @@ -79,6 +79,11 @@ do fi JVM_ARGS="$JVM_ARGS -Dssl.enable=true $SSL_ARGS" ;; + --archive-command) + # archive-command can be multi-word, we need to special handle that in POSIX shell + ARCHIVE_COMMAND="$2" + shift + ;; -D*) JVM_ARGS="$JVM_ARGS $1" ;; @@ -93,11 +98,19 @@ if [ "x$MAX_HEAP_SIZE" = "x" ]; then MAX_HEAP_SIZE="128m" fi -"$JAVA" $JAVA_AGENT -ea -cp "$CLASSPATH" $JVM_OPTS -Xmx$MAX_HEAP_SIZE \ - -XX:ParallelGCThreads=1 \ - -Dcassandra.storagedir="$cassandra_storagedir" \ - -Dlogback.configurationFile=logback-tools.xml \ - $JVM_ARGS \ - org.apache.cassandra.tools.NodeTool -p $JMX_PORT $ARGS +CMD=$(echo "$JAVA" $JAVA_AGENT -ea -cp "$CLASSPATH" $JVM_OPTS -Xmx$MAX_HEAP_SIZE \ + -XX:ParallelGCThreads=1 \ + -Dcassandra.storagedir="$cassandra_storagedir" \ + -Dlogback.configurationFile=logback-tools.xml \ + $JVM_ARGS \ + org.apache.cassandra.tools.NodeTool -p $JMX_PORT $ARGS) + +if [ "x$ARCHIVE_COMMAND" != "x" ] +then + exec $CMD "--archive-command" "${ARCHIVE_COMMAND}" +else + exec $CMD +fi + # vi:ai sw=4 ts=4 tw=0 et http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e00ab95/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 190ce77..feb9037 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -1219,12 +1219,32 @@ audit_logging_options: enabled: false logger: BinAuditLogger # audit_logs_dir: - # included_keyspaces: + # included_keyspaces: # excluded_keyspaces: # included_categories: # excluded_categories: # included_users: # excluded_users: + # roll_cycle: HOURLY + # block: true + # max_queue_weight: 268435456 # 256 MiB + # max_log_size: 17179869184 # 16 GiB + ## archive command is "/path/to/script.sh %path" where %path is replaced with the file being rolled: + # archive_command: + # max_archive_retries: 10 + + +# default options for full query logging - these can be overridden from command line when executing +# nodetool enablefullquerylog +#full_query_logging_options: + # log_dir: + # roll_cycle: HOURLY + # block: true + # max_queue_weight: 268435456 # 256 MiB + # max_log_size: 17179869184 # 16 GiB + ## archive command is "/path/to/script.sh %path" where %path is replaced with the file being rolled: + # archive_command: + # max_archive_retries: 10 # validate tombstones on reads and compaction # can be either "disabled", "warn" or "exception" http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e00ab95/src/java/org/apache/cassandra/audit/AuditLogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/audit/AuditLogManager.java b/src/java/org/apache/cassandra/audit/AuditLogManager.java index 25966f7..041bdee 100644 --- a/src/java/org/apache/cassandra/audit/AuditLogManager.java +++ b/src/java/org/apache/cassandra/audit/AuditLogManager.java @@ -293,7 +293,7 @@ public class AuditLogManager oldLogger.stop(); } - public void configureFQL(Path path, String rollCycle, boolean blocking, int maxQueueWeight, long maxLogSize) + public void configureFQL(Path path, String rollCycle, boolean blocking, int maxQueueWeight, long maxLogSize, String archiveCommand, int maxArchiveRetries) { if (path.equals(auditLogger.path())) throw new IllegalArgumentException(String.format("fullquerylogger path (%s) cannot be the same as the " + @@ -301,7 +301,7 @@ public class AuditLogManager path, auditLogger.path())); - fullQueryLogger.configure(path, rollCycle, blocking, maxQueueWeight, maxLogSize); + fullQueryLogger.configure(path, rollCycle, blocking, maxQueueWeight, maxLogSize, archiveCommand, maxArchiveRetries); } public void resetFQL(String fullQueryLogPath) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e00ab95/src/java/org/apache/cassandra/audit/AuditLogOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/audit/AuditLogOptions.java b/src/java/org/apache/cassandra/audit/AuditLogOptions.java index 1888c45..3d0efa3 100644 --- a/src/java/org/apache/cassandra/audit/AuditLogOptions.java +++ b/src/java/org/apache/cassandra/audit/AuditLogOptions.java @@ -19,7 +19,9 @@ package org.apache.cassandra.audit; import org.apache.commons.lang3.StringUtils; -public class AuditLogOptions +import org.apache.cassandra.utils.binlog.BinLogOptions; + +public class AuditLogOptions extends BinLogOptions { public volatile boolean enabled = false; public String logger = BinAuditLogger.class.getSimpleName(); @@ -35,27 +37,24 @@ public class AuditLogOptions */ public String audit_logs_dir = System.getProperty("cassandra.logdir.audit", System.getProperty("cassandra.logdir",".")+"/audit/"); - /** - * Indicates if the AuditLog should block if the it falls behind or should drop audit log records. - * Default is set to true so that AuditLog records wont be lost - */ - public boolean block = true; - - /** - * Maximum weight of in memory queue for records waiting to be written to the audit log file - * before blocking or dropping the log records. For advanced configurations - */ - public int max_queue_weight = 256 * 1024 * 1024; - /** - * Maximum size of the rolled files to retain on disk before deleting the oldest file. For advanced configurations - */ - public long max_log_size = 16L * 1024L * 1024L * 1024L; - - /** - * How often to roll Audit log segments so they can potentially be reclaimed. Available options are: - * MINUTELY, HOURLY, DAILY, LARGE_DAILY, XLARGE_DAILY, HUGE_DAILY. - * For more options, refer: net.openhft.chronicle.queue.RollCycles - */ - public String roll_cycle = "HOURLY"; + public String toString() + { + return "AuditLogOptions{" + + "enabled=" + enabled + + ", logger='" + logger + '\'' + + ", included_keyspaces='" + included_keyspaces + '\'' + + ", excluded_keyspaces='" + excluded_keyspaces + '\'' + + ", included_categories='" + included_categories + '\'' + + ", excluded_categories='" + excluded_categories + '\'' + + ", included_users='" + included_users + '\'' + + ", excluded_users='" + excluded_users + '\'' + + ", audit_logs_dir='" + audit_logs_dir + '\'' + + ", archive_command='" + archive_command + '\'' + + ", roll_cycle='" + roll_cycle + '\'' + + ", block=" + block + + ", max_queue_weight=" + max_queue_weight + + ", max_log_size=" + max_log_size + + '}'; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e00ab95/src/java/org/apache/cassandra/audit/BinAuditLogger.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/audit/BinAuditLogger.java b/src/java/org/apache/cassandra/audit/BinAuditLogger.java index 3ac9499..bd3a158 100644 --- a/src/java/org/apache/cassandra/audit/BinAuditLogger.java +++ b/src/java/org/apache/cassandra/audit/BinAuditLogger.java @@ -39,7 +39,9 @@ public class BinAuditLogger extends BinLogAuditLogger implements IAuditLogger auditLoggingOptions.block, auditLoggingOptions.max_queue_weight, auditLoggingOptions.max_log_size, - false); + false, + auditLoggingOptions.archive_command, + auditLoggingOptions.max_archive_retries); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e00ab95/src/java/org/apache/cassandra/audit/BinLogAuditLogger.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/audit/BinLogAuditLogger.java b/src/java/org/apache/cassandra/audit/BinLogAuditLogger.java index 7534650..d43bb4a 100644 --- a/src/java/org/apache/cassandra/audit/BinLogAuditLogger.java +++ b/src/java/org/apache/cassandra/audit/BinLogAuditLogger.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,6 +36,9 @@ import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.NoSpamLogger; import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.binlog.BinLog; +import org.apache.cassandra.utils.binlog.BinLogArchiver; +import org.apache.cassandra.utils.binlog.DeletingArchiver; +import org.apache.cassandra.utils.binlog.ExternalArchiver; abstract class BinLogAuditLogger implements IAuditLogger { @@ -55,10 +59,12 @@ abstract class BinLogAuditLogger implements IAuditLogger * @param blocking Whether the FQL should block if the FQL falls behind or should drop log records * @param maxQueueWeight Maximum weight of in memory queue for records waiting to be written to the file before blocking or dropping * @param maxLogSize Maximum size of the rolled files to retain on disk before deleting the oldest file + * @param archiveCommand the archive command to execute on rolled log files + * @param maxArchiveRetries max number of retries of failed archive commands */ - public synchronized void configure(Path path, String rollCycle, boolean blocking, int maxQueueWeight, long maxLogSize) + public synchronized void configure(Path path, String rollCycle, boolean blocking, int maxQueueWeight, long maxLogSize, String archiveCommand, int maxArchiveRetries) { - this.configure(path, rollCycle, blocking, maxQueueWeight, maxLogSize, true); + this.configure(path, rollCycle, blocking, maxQueueWeight, maxLogSize, true, archiveCommand, maxArchiveRetries); } /** @@ -69,8 +75,10 @@ abstract class BinLogAuditLogger implements IAuditLogger * @param maxQueueWeight Maximum weight of in memory queue for records waiting to be written to the file before blocking or dropping * @param maxLogSize Maximum size of the rolled files to retain on disk before deleting the oldest file * @param cleanDirectory Indicates to clean the directory before starting FullQueryLogger or not + * @param archiveCommand the archive command to execute on rolled log files + * @param maxArchiveRetries max number of retries of failed archive commands */ - public synchronized void configure(Path path, String rollCycle, boolean blocking, int maxQueueWeight, long maxLogSize, boolean cleanDirectory) + public synchronized void configure(Path path, String rollCycle, boolean blocking, int maxQueueWeight, long maxLogSize, boolean cleanDirectory, String archiveCommand, int maxArchiveRetries) { Preconditions.checkNotNull(path, "path was null"); File pathAsFile = path.toFile(); @@ -83,7 +91,7 @@ abstract class BinLogAuditLogger implements IAuditLogger Preconditions.checkNotNull(RollCycles.valueOf(rollCycle), "unrecognized roll cycle"); Preconditions.checkArgument(maxQueueWeight > 0, "maxQueueWeight must be > 0"); Preconditions.checkArgument(maxLogSize > 0, "maxLogSize must be > 0"); - logger.info("Attempting to configure full query logger path: {} Roll cycle: {} Blocking: {} Max queue weight: {} Max log size:{}", path, rollCycle, blocking, maxQueueWeight, maxLogSize); + logger.info("Attempting to configure full query logger path: {} Roll cycle: {} Blocking: {} Max queue weight: {} Max log size:{}, archive command: {}", path, rollCycle, blocking, maxQueueWeight, maxLogSize, archiveCommand); if (binLog != null) { @@ -91,6 +99,8 @@ abstract class BinLogAuditLogger implements IAuditLogger throw new IllegalStateException("Already configured"); } + // create the archiver before cleaning directories - ExternalArchiver will try to archive any existing file. + BinLogArchiver archiver = Strings.isNullOrEmpty(archiveCommand) ? new DeletingArchiver(maxLogSize) : new ExternalArchiver(archiveCommand, path, maxArchiveRetries); if (cleanDirectory) { logger.info("Cleaning directory: {} as requested",path); @@ -103,10 +113,9 @@ abstract class BinLogAuditLogger implements IAuditLogger } } } - this.path = path; this.blocking = blocking; - binLog = new BinLog(path, RollCycles.valueOf(rollCycle), maxQueueWeight, maxLogSize); + binLog = new BinLog(path, RollCycles.valueOf(rollCycle), maxQueueWeight, archiver); binLog.start(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e00ab95/src/java/org/apache/cassandra/audit/FullQueryLoggerOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/audit/FullQueryLoggerOptions.java b/src/java/org/apache/cassandra/audit/FullQueryLoggerOptions.java new file mode 100644 index 0000000..825a8b8 --- /dev/null +++ b/src/java/org/apache/cassandra/audit/FullQueryLoggerOptions.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.audit; + +import org.apache.commons.lang3.StringUtils; + +import org.apache.cassandra.utils.binlog.BinLogOptions; + +public class FullQueryLoggerOptions extends BinLogOptions +{ + public String log_dir = StringUtils.EMPTY; + + public String toString() + { + return "FullQueryLoggerOptions{" + + "log_dir='" + log_dir + '\'' + + ", archive_command='" + archive_command + '\'' + + ", roll_cycle='" + roll_cycle + '\'' + + ", block=" + block + + ", max_queue_weight=" + max_queue_weight + + ", max_log_size=" + max_log_size + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e00ab95/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 782815e..9049131 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -33,6 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.audit.AuditLogOptions; +import org.apache.cassandra.audit.FullQueryLoggerOptions; import org.apache.cassandra.db.ConsistencyLevel; /** @@ -379,8 +380,6 @@ public class Config public RepairCommandPoolFullStrategy repair_command_pool_full_strategy = RepairCommandPoolFullStrategy.queue; public int repair_command_pool_size = concurrent_validations; - public String full_query_log_dir = null; - // parameters to adjust how much to delay startup until a certain amount of the cluster is connect to and marked alive public int block_for_peers_percentage = 70; public int block_for_peers_timeout_in_secs = 10; @@ -389,6 +388,7 @@ public class Config public boolean stream_entire_sstables = true; public volatile AuditLogOptions audit_logging_options = new AuditLogOptions(); + public volatile FullQueryLoggerOptions full_query_logging_options = new FullQueryLoggerOptions(); public CorruptedTombstoneStrategy corrupted_tombstone_strategy = CorruptedTombstoneStrategy.disabled; http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e00ab95/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index dc76431..de87de5 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -38,6 +38,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.audit.AuditLogOptions; +import org.apache.cassandra.audit.FullQueryLoggerOptions; import org.apache.cassandra.auth.AllowAllInternodeAuthenticator; import org.apache.cassandra.auth.AuthConfig; import org.apache.cassandra.auth.IAuthenticator; @@ -2604,9 +2605,9 @@ public class DatabaseDescriptor return conf.repair_command_pool_full_strategy; } - public static String getFullQueryLogPath() + public static FullQueryLoggerOptions getFullQueryLogOptions() { - return conf.full_query_log_dir; + return conf.full_query_logging_options; } public static int getBlockForPeersPercentage() http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e00ab95/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index b3adc47..630dc5d 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.service; +import java.io.File; import java.lang.management.ManagementFactory; import java.nio.ByteBuffer; import java.nio.file.Paths; @@ -29,6 +30,7 @@ import javax.management.MBeanServer; import javax.management.ObjectName; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.cache.CacheLoader; import com.google.common.collect.*; import com.google.common.primitives.Ints; @@ -40,6 +42,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.audit.AuditLogManager; +import org.apache.cassandra.audit.FullQueryLoggerOptions; import org.apache.cassandra.batchlog.Batch; import org.apache.cassandra.batchlog.BatchlogManager; import org.apache.cassandra.concurrent.Stage; @@ -2720,17 +2723,25 @@ public class StorageProxy implements StorageProxyMBean } @Override - public void configureFullQueryLogger(String path, String rollCycle, boolean blocking, int maxQueueWeight, long maxLogSize) + public void configureFullQueryLogger(String path, String rollCycle, Boolean blocking, int maxQueueWeight, long maxLogSize, String archiveCommand, int maxArchiveRetries) { - path = path != null ? path : DatabaseDescriptor.getFullQueryLogPath(); - Preconditions.checkNotNull(path, "cassandra.yaml did not set full_query_log_dir and not set as parameter"); - AuditLogManager.getInstance().configureFQL(Paths.get(path), rollCycle, blocking, maxQueueWeight, maxLogSize); + FullQueryLoggerOptions fqlOptions = DatabaseDescriptor.getFullQueryLogOptions(); + path = path != null ? path : fqlOptions.log_dir; + rollCycle = rollCycle != null ? rollCycle : fqlOptions.roll_cycle; + blocking = blocking != null ? blocking : fqlOptions.block; + maxQueueWeight = maxQueueWeight != Integer.MIN_VALUE ? maxQueueWeight : fqlOptions.max_queue_weight; + maxLogSize = maxLogSize != Long.MIN_VALUE ? maxLogSize : fqlOptions.max_log_size; + archiveCommand = archiveCommand != null ? archiveCommand : fqlOptions.archive_command; + maxArchiveRetries = maxArchiveRetries != Integer.MIN_VALUE ? maxArchiveRetries : fqlOptions.max_archive_retries; + + Preconditions.checkNotNull(path, "cassandra.yaml did not set log_dir and not set as parameter"); + AuditLogManager.getInstance().configureFQL(Paths.get(path), rollCycle, blocking, maxQueueWeight, maxLogSize, archiveCommand, maxArchiveRetries); } @Override public void resetFullQueryLogger() { - AuditLogManager.getInstance().resetFQL(DatabaseDescriptor.getFullQueryLogPath()); + AuditLogManager.getInstance().resetFQL(DatabaseDescriptor.getFullQueryLogOptions().log_dir); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e00ab95/src/java/org/apache/cassandra/service/StorageProxyMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java b/src/java/org/apache/cassandra/service/StorageProxyMBean.java index efc163d..95f5f26 100644 --- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java +++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java @@ -21,6 +21,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import javax.annotation.Nullable; + import org.apache.cassandra.db.ConsistencyLevel; public interface StorageProxyMBean @@ -80,8 +82,11 @@ public interface StorageProxyMBean * @param blocking Whether threads submitting queries to the query log should block if they can't be drained to the filesystem or alternatively drops samples and log * @param maxQueueWeight How many bytes of query data to queue before blocking or dropping samples * @param maxLogSize How many bytes of log data to store before dropping segments. Might not be respected if a log file hasn't rolled so it can be deleted. + * @param archiveCommand executable archiving the rolled log files, + * @param maxArchiveRetries max number of times to retry a failing archive command + * */ - public void configureFullQueryLogger(String path, String rollCycle, boolean blocking, int maxQueueWeight, long maxLogSize); + public void configureFullQueryLogger(String path, String rollCycle, Boolean blocking, int maxQueueWeight, long maxLogSize, @Nullable String archiveCommand, int maxArchiveRetries); /** * Disable the full query logger if it is enabled. http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e00ab95/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index abc189e..7cd99de 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -5352,9 +5352,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE AuditLogManager.getInstance().enableAuditLog(auditLogOptions); logger.info("AuditLog is enabled with logger: [{}], included_keyspaces: [{}], excluded_keyspaces: [{}], " + - "included_categories: [{}], excluded_categories: [{}]," + - "included_users: [{}], excluded_users: [{}],", loggerName, auditLogOptions.included_keyspaces, auditLogOptions.excluded_keyspaces, - auditLogOptions.included_categories, auditLogOptions.excluded_categories, auditLogOptions.included_users, auditLogOptions.excluded_users); + "included_categories: [{}], excluded_categories: [{}], included_users: [{}], " + + "excluded_users: [{}], archive_command: [{}]", loggerName, auditLogOptions.included_keyspaces, auditLogOptions.excluded_keyspaces, + auditLogOptions.included_categories, auditLogOptions.excluded_categories, auditLogOptions.included_users, auditLogOptions.excluded_users, + auditLogOptions.archive_command); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e00ab95/src/java/org/apache/cassandra/tools/nodetool/EnableFullQueryLog.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/EnableFullQueryLog.java b/src/java/org/apache/cassandra/tools/nodetool/EnableFullQueryLog.java index 624a301..1d35e66 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/EnableFullQueryLog.java +++ b/src/java/org/apache/cassandra/tools/nodetool/EnableFullQueryLog.java @@ -23,27 +23,34 @@ import io.airlift.airline.Option; import org.apache.cassandra.tools.NodeProbe; import org.apache.cassandra.tools.NodeTool.NodeToolCmd; -@Command(name = "enablefullquerylog", description = "Enable full query logging") +@Command(name = "enablefullquerylog", description = "Enable full query logging, defaults for the options are configured in cassandra.yaml") public class EnableFullQueryLog extends NodeToolCmd { - @Option(title = "roll_cycle", name = {"--roll-cycle"}, description = "How often to roll the log file (MINUTELY, HOURLY, DAILY). Default HOURLY.") - private String rollCycle = "HOURLY"; + @Option(title = "roll_cycle", name = {"--roll-cycle"}, description = "How often to roll the log file (MINUTELY, HOURLY, DAILY).") + private String rollCycle = null; - @Option(title = "blocking", name = {"--blocking"}, description = "If the queue is full whether to block producers or drop samples. Default true.") - private boolean blocking = true; + @Option(title = "blocking", name = {"--blocking"}, description = "If the queue is full whether to block producers or drop samples.") + private Boolean blocking = null; - @Option(title = "max_queue_weight", name = {"--max-queue-weight"}, description = "Maximum number of bytes of query data to queue to disk before blocking or dropping samples. Default 256 megabytes.") - private int maxQueueWeight = 256 * 1024 * 1024; + @Option(title = "max_queue_weight", name = {"--max-queue-weight"}, description = "Maximum number of bytes of query data to queue to disk before blocking or dropping samples.") + private int maxQueueWeight = Integer.MIN_VALUE; - @Option(title = "max_log_size", name = {"--max-log-size"}, description = "How many bytes of log data to store before dropping segments. Might not be respected if a log file hasn't rolled so it can be deleted. Default 16 gigabytes.") - private long maxLogSize = 16L * 1024L * 1024L * 1024L; + @Option(title = "max_log_size", name = {"--max-log-size"}, description = "How many bytes of log data to store before dropping segments. Might not be respected if a log file hasn't rolled so it can be deleted.") + private long maxLogSize = Long.MIN_VALUE; - @Option(title = "path", name = {"--path"}, description = "Path to store the full query log at. Will have it's contents recursively deleted. If not set the value from cassandra.yaml will be used.") + @Option(title = "path", name = {"--path"}, description = "Path to store the full query log at. Will have it's contents recursively deleted.") private String path = null; + @Option(title = "archive_command", name = {"--archive-command"}, description = "Command that will handle archiving rolled full query log files." + + " Format is \"/path/to/script.sh %path\" where %path will be replaced with the file to archive") + private String archiveCommand = null; + + @Option(title = "archive_retries", name = {"--max-archive-retries"}, description = "Max number of archive retries.") + private int archiveRetries = Integer.MIN_VALUE; + @Override public void execute(NodeProbe probe) { - probe.getSpProxy().configureFullQueryLogger(path, rollCycle, blocking, maxQueueWeight, maxLogSize); + probe.getSpProxy().configureFullQueryLogger(path, rollCycle, blocking, maxQueueWeight, maxLogSize, archiveCommand, archiveRetries); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e00ab95/src/java/org/apache/cassandra/utils/binlog/BinLog.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/binlog/BinLog.java b/src/java/org/apache/cassandra/utils/binlog/BinLog.java index 0c8659e..d4dac78 100644 --- a/src/java/org/apache/cassandra/utils/binlog/BinLog.java +++ b/src/java/org/apache/cassandra/utils/binlog/BinLog.java @@ -18,12 +18,9 @@ package org.apache.cassandra.utils.binlog; -import java.io.File; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; @@ -35,7 +32,6 @@ import net.openhft.chronicle.queue.ChronicleQueue; import net.openhft.chronicle.queue.ChronicleQueueBuilder; import net.openhft.chronicle.queue.ExcerptAppender; import net.openhft.chronicle.queue.RollCycle; -import net.openhft.chronicle.queue.impl.StoreFileListener; import net.openhft.chronicle.wire.WireOut; import net.openhft.chronicle.wire.WriteMarshallable; import org.apache.cassandra.concurrent.NamedThreadFactory; @@ -53,7 +49,7 @@ import org.apache.cassandra.utils.concurrent.WeightedQueue; * to handle writing the log, making it available for readers, as well as log rolling. * */ -public class BinLog implements Runnable, StoreFileListener +public class BinLog implements Runnable { private static final Logger logger = LoggerFactory.getLogger(BinLog.class); @@ -62,17 +58,7 @@ public class BinLog implements Runnable, StoreFileListener @VisibleForTesting Thread binLogThread = new NamedThreadFactory("Binary Log thread").newThread(this); final WeightedQueue<ReleaseableWriteMarshallable> sampleQueue; - private final long maxLogSize; - - /** - * The files in the chronicle queue that have already rolled - */ - private Queue<File> chronicleStoreFiles = new ConcurrentLinkedQueue<>(); - - /** - * The number of bytes in store files that have already rolled - */ - private long bytesInStoreFiles; + private final BinLogArchiver archiver; private static final ReleaseableWriteMarshallable NO_OP = new ReleaseableWriteMarshallable() { @@ -90,25 +76,23 @@ public class BinLog implements Runnable, StoreFileListener private volatile boolean shouldContinue = true; /** - * - * @param path Path to store the BinLog. Can't be shared with anything else. - * @param rollCycle How often to roll the log file so it can potentially be deleted + * @param path Path to store the BinLog. Can't be shared with anything else. + * @param rollCycle How often to roll the log file so it can potentially be deleted * @param maxQueueWeight Maximum weight of in memory queue for records waiting to be written to the file before blocking or dropping - * @param maxLogSize Maximum size of the rolled files to retain on disk before deleting the oldest file */ - public BinLog(Path path, RollCycle rollCycle, int maxQueueWeight, long maxLogSize) + public BinLog(Path path, RollCycle rollCycle, int maxQueueWeight, BinLogArchiver archiver) { Preconditions.checkNotNull(path, "path was null"); Preconditions.checkNotNull(rollCycle, "rollCycle was null"); Preconditions.checkArgument(maxQueueWeight > 0, "maxQueueWeight must be > 0"); - Preconditions.checkArgument(maxLogSize > 0, "maxLogSize must be > 0"); ChronicleQueueBuilder builder = ChronicleQueueBuilder.single(path.toFile()); builder.rollCycle(rollCycle); - builder.storeFileListener(this); + + sampleQueue = new WeightedQueue<>(maxQueueWeight); + this.archiver = archiver; + builder.storeFileListener(this.archiver); queue = builder.build(); appender = queue.acquireAppender(); - sampleQueue = new WeightedQueue<>(maxQueueWeight); - this.maxLogSize = maxLogSize; } /** @@ -139,6 +123,7 @@ public class BinLog implements Runnable, StoreFileListener binLogThread.join(); appender = null; queue = null; + archiver.stop(); } /** @@ -226,34 +211,6 @@ public class BinLog implements Runnable, StoreFileListener finalize(); } - /** - * Track store files as they are added and their storage impact. Delete them if over storage limit. - * @param cycle - * @param file - */ - public synchronized void onReleased(int cycle, File file) - { - chronicleStoreFiles.offer(file); - //This isn't accurate because the files are sparse, but it's at least pessimistic - bytesInStoreFiles += file.length(); - logger.debug("Chronicle store file {} rolled file size {}", file.getPath(), file.length()); - while (bytesInStoreFiles > maxLogSize & !chronicleStoreFiles.isEmpty()) - { - File toDelete = chronicleStoreFiles.poll(); - long toDeleteLength = toDelete.length(); - if (!toDelete.delete()) - { - logger.error("Failed to delete chronicle store file: {} store file size: {} bytes in store files: {}. " + - "You will need to clean this up manually or reset full query logging.", - toDelete.getPath(), toDeleteLength, bytesInStoreFiles); - } - else - { - bytesInStoreFiles -= toDeleteLength; - logger.info("Deleted chronicle store file: {} store file size: {} bytes in store files: {} max log size: {}.", file.getPath(), toDeleteLength, bytesInStoreFiles, maxLogSize); - } - } - } /** * There is a race where we might not release a buffer, going to let finalization http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e00ab95/src/java/org/apache/cassandra/utils/binlog/BinLogArchiver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/binlog/BinLogArchiver.java b/src/java/org/apache/cassandra/utils/binlog/BinLogArchiver.java new file mode 100644 index 0000000..9a6f0bc --- /dev/null +++ b/src/java/org/apache/cassandra/utils/binlog/BinLogArchiver.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.binlog; + +import java.io.File; + +import net.openhft.chronicle.queue.impl.StoreFileListener; + +public interface BinLogArchiver extends StoreFileListener +{ + public void onReleased(int cycle, File file); + public void stop(); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e00ab95/src/java/org/apache/cassandra/utils/binlog/BinLogOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/binlog/BinLogOptions.java b/src/java/org/apache/cassandra/utils/binlog/BinLogOptions.java new file mode 100644 index 0000000..8005ca3 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/binlog/BinLogOptions.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.binlog; + +import org.apache.commons.lang3.StringUtils; + +public class BinLogOptions +{ + public String archive_command = StringUtils.EMPTY; + /** + * How often to roll BinLog segments so they can potentially be reclaimed. Available options are: + * MINUTELY, HOURLY, DAILY, LARGE_DAILY, XLARGE_DAILY, HUGE_DAILY. + * For more options, refer: net.openhft.chronicle.queue.RollCycles + */ + public String roll_cycle = "HOURLY"; + /** + * Indicates if the BinLog should block if the it falls behind or should drop bin log records. + * Default is set to true so that BinLog records wont be lost + */ + public boolean block = true; + + /** + * Maximum weight of in memory queue for records waiting to be written to the binlog file + * before blocking or dropping the log records. For advanced configurations + */ + public int max_queue_weight = 256 * 1024 * 1024; + + /** + * Maximum size of the rolled files to retain on disk before deleting the oldest file. For advanced configurations. + */ + public long max_log_size = 16L * 1024L * 1024L * 1024L; + + /** + * Limit the number of times to retry a command. + */ + public int max_archive_retries = 10; +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e00ab95/src/java/org/apache/cassandra/utils/binlog/DeletingArchiver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/binlog/DeletingArchiver.java b/src/java/org/apache/cassandra/utils/binlog/DeletingArchiver.java new file mode 100644 index 0000000..3bdbb8f --- /dev/null +++ b/src/java/org/apache/cassandra/utils/binlog/DeletingArchiver.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.binlog; + +import java.io.File; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DeletingArchiver implements BinLogArchiver +{ + private static final Logger logger = LoggerFactory.getLogger(DeletingArchiver.class); + /** + * The files in the chronicle queue that have already rolled + */ + private final Queue<File> chronicleStoreFiles = new ConcurrentLinkedQueue<>(); + private final long maxLogSize; + /** + * The number of bytes in store files that have already rolled + */ + private long bytesInStoreFiles; + + public DeletingArchiver(long maxLogSize) + { + Preconditions.checkArgument(maxLogSize > 0, "maxLogSize must be > 0"); + this.maxLogSize = maxLogSize; + } + + /** + * Track store files as they are added and their storage impact. Delete them if over storage limit. + * @param cycle + * @param file + */ + public synchronized void onReleased(int cycle, File file) + { + chronicleStoreFiles.offer(file); + //This isn't accurate because the files are sparse, but it's at least pessimistic + bytesInStoreFiles += file.length(); + logger.debug("Chronicle store file {} rolled file size {}", file.getPath(), file.length()); + while (bytesInStoreFiles > maxLogSize & !chronicleStoreFiles.isEmpty()) + { + File toDelete = chronicleStoreFiles.poll(); + long toDeleteLength = toDelete.length(); + if (!toDelete.delete()) + { + logger.error("Failed to delete chronicle store file: {} store file size: {} bytes in store files: {}. " + + "You will need to clean this up manually or reset full query logging.", + toDelete.getPath(), toDeleteLength, bytesInStoreFiles); + } + else + { + bytesInStoreFiles -= toDeleteLength; + logger.info("Deleted chronicle store file: {} store file size: {} bytes in store files: {} max log size: {}.", + file.getPath(), toDeleteLength, bytesInStoreFiles, maxLogSize); + } + } + } + + @VisibleForTesting + long getBytesInStoreFiles() + { + return bytesInStoreFiles; + } + + public void stop() + { + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e00ab95/src/java/org/apache/cassandra/utils/binlog/ExternalArchiver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/binlog/ExternalArchiver.java b/src/java/org/apache/cassandra/utils/binlog/ExternalArchiver.java new file mode 100644 index 0000000..e53c5b0 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/binlog/ExternalArchiver.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.binlog; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.primitives.Longs; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue; +import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.utils.FBUtilities; + +/** + * Archives binary log files immediately when they are rolled using a configure archive command. + * + * The archive command should be "/path/to/script.sh %path" where %path will be replaced with the file to be archived + */ +public class ExternalArchiver implements BinLogArchiver +{ + private static final Logger logger = LoggerFactory.getLogger(ExternalArchiver.class); + // used to replace %path with the actual file to archive when calling the archive command + private static final Pattern PATH = Pattern.compile("%path"); + private static final long DEFAULT_RETRY_DELAY_MS = TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES); + + /** + * use a DelayQueue to simplify retries - we want first tries to be executed immediately and retries should wait DEFAULT_RETRY_DELAY_MS + */ + private final DelayQueue<DelayFile> archiveQueue = new DelayQueue<>(); + private final String archiveCommand; + private final ExecutorService executor = Executors.newSingleThreadExecutor(new NamedThreadFactory("BinLogArchiver")); + private final Path path; + /** + * for testing, to be able to make sure that the command is executed + */ + private final ExecCommand commandExecutor; + private volatile boolean shouldContinue = true; + + public ExternalArchiver(String archiveCommand, Path path, int maxArchiveRetries) + { + this(archiveCommand, path, DEFAULT_RETRY_DELAY_MS, maxArchiveRetries, ExternalArchiver::exec); + } + + @VisibleForTesting + ExternalArchiver(String archiveCommand, Path path, long retryDelayMs, int maxRetries, ExecCommand command) + { + this.archiveCommand = archiveCommand; + this.commandExecutor = command; + // if there are any .cq4 files in path, archive them on startup - this handles any leftover files from crashes etc + archiveExisting(path); + this.path = path; + + executor.execute(() -> { + while (shouldContinue) + { + DelayFile toArchive = null; + try + { + toArchive = archiveQueue.poll(100, TimeUnit.MILLISECONDS); + if (toArchive != null) + archiveFile(toArchive.file); + } + catch (Throwable t) + { + if (toArchive != null) + { + + if (toArchive.retries < maxRetries) + { + logger.error("Got error archiving {}, retrying in {} minutes", toArchive.file, TimeUnit.MINUTES.convert(retryDelayMs, TimeUnit.MILLISECONDS), t); + archiveQueue.add(new DelayFile(toArchive.file, retryDelayMs, TimeUnit.MILLISECONDS, toArchive.retries + 1)); + } + else + { + logger.error("Max retries {} reached for {}, leaving on disk", toArchive.retries, toArchive.file, t); + } + } + else + logger.error("Got error waiting for files to archive", t); + } + } + logger.debug("Exiting archiver thread"); + }); + } + + public void onReleased(int cycle, File file) + { + logger.debug("BinLog file released: {}", file); + archiveQueue.add(new DelayFile(file, 0, TimeUnit.MILLISECONDS, 0)); + } + + /** + * Stops the archiver thread and tries to archive all existing files + * + * this handles the case where a user explicitly disables full/audit log and would expect all log files to be archived + * rolled or not + */ + public void stop() + { + shouldContinue = false; + try + { + // wait for the archiver thread to stop; + executor.submit(() -> {}).get(); + // and try to archive all remaining files before exiting + archiveExisting(path); + } + catch (InterruptedException | ExecutionException e) + { + throw new RuntimeException(e); + } + } + + /** + * Iterates over all files in path, executing the archive command for each. + */ + private void archiveExisting(Path path) + { + if (path == null) + return; + for (File f : path.toFile().listFiles((f) -> f.isFile() && f.getName().endsWith(SingleChronicleQueue.SUFFIX))) + { + try + { + logger.debug("Archiving existing file {}", f); + archiveFile(f); + } + catch (IOException e) + { + logger.error("Got error archiving existing file {}", f, e); + } + } + } + + private void archiveFile(File f) throws IOException + { + String cmd = PATH.matcher(archiveCommand).replaceAll(Matcher.quoteReplacement(f.getAbsolutePath())); + logger.debug("Executing archive command: {}", cmd); + commandExecutor.exec(cmd); + } + + static void exec(String command) throws IOException + { + ProcessBuilder pb = new ProcessBuilder(command.split(" ")); + pb.redirectErrorStream(true); + FBUtilities.exec(pb); + } + + private static class DelayFile implements Delayed + { + public final File file; + private final long delayTime; + private final int retries; + + public DelayFile(File file, long delay, TimeUnit delayUnit, int retries) + { + this.file = file; + this.delayTime = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(delay, delayUnit); + this.retries = retries; + } + public long getDelay(TimeUnit unit) + { + return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + } + + public int compareTo(Delayed o) + { + DelayFile other = (DelayFile)o; + return Longs.compare(delayTime, other.delayTime); + } + } + + interface ExecCommand + { + public void exec(String command) throws IOException; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e00ab95/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java b/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java index 5fe078a..525fa8e 100644 --- a/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java +++ b/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; +import org.apache.commons.lang3.StringUtils; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; @@ -54,6 +55,7 @@ import org.apache.cassandra.service.QueryState; import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.ObjectSizes; import org.apache.cassandra.utils.binlog.BinLogTest; +import org.apache.cassandra.utils.binlog.DeletingArchiver; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -101,31 +103,31 @@ public class FullQueryLoggerTest extends CQLTester @Test(expected = NullPointerException.class) public void testConfigureNullPath() throws Exception { - instance.configure(null, "", true, 1, 1); + instance.configure(null, "", true, 1, 1, StringUtils.EMPTY, 10); } @Test(expected = NullPointerException.class) public void testConfigureNullRollCycle() throws Exception { - instance.configure(BinLogTest.tempDir(), null, true, 1, 1); + instance.configure(BinLogTest.tempDir(), null, true, 1, 1, StringUtils.EMPTY, 10); } @Test(expected = IllegalArgumentException.class) public void testConfigureInvalidRollCycle() throws Exception { - instance.configure(BinLogTest.tempDir(), "foobar", true, 1, 1); + instance.configure(BinLogTest.tempDir(), "foobar", true, 1, 1, StringUtils.EMPTY, 10); } @Test(expected = IllegalArgumentException.class) public void testConfigureInvalidMaxQueueWeight() throws Exception { - instance.configure(BinLogTest.tempDir(), "DAILY", true, 0, 1); + instance.configure(BinLogTest.tempDir(), "DAILY", true, 0, 1, StringUtils.EMPTY, 10); } @Test(expected = IllegalArgumentException.class) public void testConfigureInvalidMaxQueueLogSize() throws Exception { - instance.configure(BinLogTest.tempDir(), "DAILY", true, 1, 0); + instance.configure(BinLogTest.tempDir(), "DAILY", true, 1, 0, StringUtils.EMPTY, 10); } @Test(expected = IllegalArgumentException.class) @@ -133,7 +135,7 @@ public class FullQueryLoggerTest extends CQLTester { File f = FileUtils.createTempFile("foo", "bar"); f.deleteOnExit(); - instance.configure(f.toPath(), "TEST_SECONDLY", true, 1, 1); + instance.configure(f.toPath(), "TEST_SECONDLY", true, 1, 1, StringUtils.EMPTY, 10); } @Test(expected = IllegalArgumentException.class) @@ -344,7 +346,7 @@ public class FullQueryLoggerTest extends CQLTester @Test public void testNonBlocking() throws Exception { - instance.configure(tempDir, "TEST_SECONDLY", false, 1, 1024 * 1024 * 256); + instance.configure(tempDir, "TEST_SECONDLY", false, 1, 1024 * 1024 * 256, StringUtils.EMPTY, 10); //Prevent the bin log thread from making progress, causing the task queue to refuse tasks Semaphore blockBinLog = new Semaphore(0); try @@ -689,7 +691,7 @@ public class FullQueryLoggerTest extends CQLTester private void configureFQL() throws Exception { - instance.configure(tempDir, "TEST_SECONDLY", true, 1, 1024 * 1024 * 256); + instance.configure(tempDir, "TEST_SECONDLY", true, 1, 1024 * 1024 * 256, StringUtils.EMPTY, 10); } private void logQuery(String query) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e00ab95/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java index 782e3b1..f23e909 100644 --- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java +++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java @@ -126,6 +126,8 @@ public class DatabaseDescriptorRefTest "org.apache.cassandra.audit.BinLogAuditLogger", "org.apache.cassandra.audit.FullQueryLogger", "org.apache.cassandra.audit.AuditLogOptions", + "org.apache.cassandra.utils.binlog.BinLogOptions", + "org.apache.cassandra.audit.FullQueryLoggerOptions", // generated classes "org.apache.cassandra.config.ConfigBeanInfo", "org.apache.cassandra.config.ConfigCustomizer", http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e00ab95/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java b/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java index d564e41..76c42f2 100644 --- a/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java +++ b/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java @@ -36,6 +36,8 @@ import net.openhft.chronicle.queue.ExcerptTailer; import net.openhft.chronicle.queue.RollCycles; import net.openhft.chronicle.wire.WireOut; import org.apache.cassandra.Util; +import org.apache.cassandra.audit.AuditLogOptions; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.util.FileUtils; import static org.junit.Assert.assertEquals; @@ -63,7 +65,7 @@ public class BinLogTest public void setUp() throws Exception { path = tempDir(); - binLog = new BinLog(path, RollCycles.TEST_SECONDLY, 10, 1024 * 1024 * 128); + binLog = new BinLog(path, RollCycles.TEST_SECONDLY, 10, new DeletingArchiver(1024 * 1024 * 128)); binLog.start(); } @@ -83,25 +85,25 @@ public class BinLogTest @Test(expected = NullPointerException.class) public void testConstructorNullPath() throws Exception { - new BinLog(null, RollCycles.TEST_SECONDLY, 1, 1); + new BinLog(null, RollCycles.TEST_SECONDLY, 1, new DeletingArchiver(1)); } @Test(expected = NullPointerException.class) public void testConstructorNullRollCycle() throws Exception { - new BinLog(tempDir(), null, 1, 1); + new BinLog(tempDir(), null, 1, new DeletingArchiver(1)); } @Test(expected = IllegalArgumentException.class) public void testConstructorZeroWeight() throws Exception { - new BinLog(tempDir(), RollCycles.TEST_SECONDLY, 0, 1); + new BinLog(tempDir(), RollCycles.TEST_SECONDLY, 0, new DeletingArchiver(1)); } @Test(expected = IllegalArgumentException.class) public void testConstructorLogSize() throws Exception { - new BinLog(tempDir(), RollCycles.TEST_SECONDLY, 1, 0); + new BinLog(tempDir(), RollCycles.TEST_SECONDLY, 0, new DeletingArchiver(1)); } /** @@ -345,7 +347,7 @@ public class BinLogTest public void testCleanupOnOversize() throws Exception { tearDown(); - binLog = new BinLog(path, RollCycles.TEST_SECONDLY, 10000, 1); + binLog = new BinLog(path, RollCycles.TEST_SECONDLY, 1, new DeletingArchiver(10000)); binLog.start(); for (int ii = 0; ii < 5; ii++) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e00ab95/test/unit/org/apache/cassandra/utils/binlog/DeletingArchiverTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/binlog/DeletingArchiverTest.java b/test/unit/org/apache/cassandra/utils/binlog/DeletingArchiverTest.java new file mode 100644 index 0000000..cd6b7a3 --- /dev/null +++ b/test/unit/org/apache/cassandra/utils/binlog/DeletingArchiverTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.binlog; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class DeletingArchiverTest +{ + @Test + public void testDelete() throws IOException + { + DeletingArchiver da = new DeletingArchiver(45); + List<File> files = generateFiles(10, 5); + for (File f : files) + da.onReleased(1, f); + // adding 5 files, each with size 10, this means the first one should have been deleted: + assertFalse(files.get(0).exists()); + for (int i = 1; i < files.size(); i++) + assertTrue(files.get(i).exists()); + assertEquals(40, da.getBytesInStoreFiles()); + } + + @Test + public void testArchiverBigFile() throws IOException + { + DeletingArchiver da = new DeletingArchiver(45); + List<File> largeFiles = generateFiles(50, 1); + da.onReleased(1, largeFiles.get(0)); + assertFalse(largeFiles.get(0).exists()); + assertEquals(0, da.getBytesInStoreFiles()); + } + + @Test + public void testArchiverSizeTracking() throws IOException + { + DeletingArchiver da = new DeletingArchiver(45); + List<File> smallFiles = generateFiles(10, 4); + List<File> largeFiles = generateFiles(40, 1); + + for (File f : smallFiles) + { + da.onReleased(1, f); + } + assertEquals(40, da.getBytesInStoreFiles()); + // we now have 40 bytes in deleting archiver, adding the large 40 byte file should delete all the small ones + da.onReleased(1, largeFiles.get(0)); + for (File f : smallFiles) + assertFalse(f.exists()); + + smallFiles = generateFiles(10, 4); + + // make sure that size tracking is ok - all 4 new small files should still be there and the large one should be gone + for (File f : smallFiles) + da.onReleased(1, f); + + assertFalse(largeFiles.get(0).exists()); + for (File f : smallFiles) + assertTrue(f.exists()); + assertEquals(40, da.getBytesInStoreFiles()); + } + + + private List<File> generateFiles(int size, int count) throws IOException + { + Random r = new Random(); + List<File> files = new ArrayList<>(count); + byte [] content = new byte[size]; + r.nextBytes(content); + + for (int i = 0; i < count; i++) + { + Path p = Files.createTempFile("logfile", ".cq4"); + Files.write(p, content); + files.add(p.toFile()); + } + files.forEach(File::deleteOnExit); + return files; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e00ab95/test/unit/org/apache/cassandra/utils/binlog/ExternalArchiverTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/binlog/ExternalArchiverTest.java b/test/unit/org/apache/cassandra/utils/binlog/ExternalArchiverTest.java new file mode 100644 index 0000000..284ff5a --- /dev/null +++ b/test/unit/org/apache/cassandra/utils/binlog/ExternalArchiverTest.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.binlog; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.attribute.PosixFilePermission; +import java.nio.file.attribute.PosixFilePermissions; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.collect.Sets; +import org.junit.Test; + +import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue; +import org.apache.cassandra.utils.Pair; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ExternalArchiverTest +{ + @Test + public void testArchiver() throws IOException, InterruptedException + { + Pair<String, String> s = createScript(); + String script = s.left; + String dir = s.right; + Path logdirectory = Files.createTempDirectory("logdirectory"); + File logfileToArchive = Files.createTempFile(logdirectory, "logfile", "xyz").toFile(); + Files.write(logfileToArchive.toPath(), "content".getBytes()); + + ExternalArchiver ea = new ExternalArchiver(script+" %path", null, 10); + ea.onReleased(1, logfileToArchive); + while (logfileToArchive.exists()) + { + Thread.sleep(100); + } + + File movedFile = new File(dir, logfileToArchive.getName()); + assertTrue(movedFile.exists()); + movedFile.deleteOnExit(); + ea.stop(); + assertEquals(0, logdirectory.toFile().listFiles().length); + } + + @Test + public void testArchiveExisting() throws IOException, InterruptedException + { + Pair<String, String> s = createScript(); + String script = s.left; + String moveDir = s.right; + List<File> existingFiles = new ArrayList<>(); + Path dir = Files.createTempDirectory("archive"); + for (int i = 0; i < 10; i++) + { + File logfileToArchive = Files.createTempFile(dir, "logfile", SingleChronicleQueue.SUFFIX).toFile(); + logfileToArchive.deleteOnExit(); + Files.write(logfileToArchive.toPath(), ("content"+i).getBytes()); + existingFiles.add(logfileToArchive); + } + + ExternalArchiver ea = new ExternalArchiver(script + " %path", dir, 10); + boolean allGone = false; + while (!allGone) + { + allGone = true; + for (File f : existingFiles) + { + if (f.exists()) + { + allGone = false; + Thread.sleep(100); + break; + } + File movedFile = new File(moveDir, f.getName()); + assertTrue(movedFile.exists()); + movedFile.deleteOnExit(); + } + } + ea.stop(); + assertEquals(0, dir.toFile().listFiles().length); + } + + @Test + public void testArchiveOnShutdown() throws IOException, InterruptedException + { + Pair<String, String> s = createScript(); + String script = s.left; + String moveDir = s.right; + Path dir = Files.createTempDirectory("archive"); + ExternalArchiver ea = new ExternalArchiver(script + " %path", dir, 10); + List<File> existingFiles = new ArrayList<>(); + for (int i = 0; i < 10; i++) + { + File logfileToArchive = Files.createTempFile(dir, "logfile", SingleChronicleQueue.SUFFIX).toFile(); + logfileToArchive.deleteOnExit(); + Files.write(logfileToArchive.toPath(), ("content"+i).getBytes()); + existingFiles.add(logfileToArchive); + } + // ea.stop will archive all .cq4 files in the directory + ea.stop(); + for (File f : existingFiles) + { + assertFalse(f.exists()); + File movedFile = new File(moveDir, f.getName()); + assertTrue(movedFile.exists()); + movedFile.deleteOnExit(); + } + } + + /** + * Make sure retries work + * 1. create a script that will fail two times before executing the command + * 2. create an ExternalArchiver that retries two times (this means we execute the script 3 times, meaning the last one will be successful) + * 3. make sure the file is on disk until the script has been executed 3 times + * 4. make sure the file is gone and that the command was executed successfully + */ + @Test + public void testRetries() throws IOException, InterruptedException + { + Pair<String, String> s = createFailingScript(2); + String script = s.left; + String moveDir = s.right; + Path logdirectory = Files.createTempDirectory("logdirectory"); + File logfileToArchive = Files.createTempFile(logdirectory, "logfile", "xyz").toFile(); + Files.write(logfileToArchive.toPath(), "content".getBytes()); + AtomicInteger tryCounter = new AtomicInteger(); + AtomicBoolean success = new AtomicBoolean(); + ExternalArchiver ea = new ExternalArchiver(script + " %path", null, 1000, 2, (cmd) -> + { + tryCounter.incrementAndGet(); + ExternalArchiver.exec(cmd); + success.set(true); + }); + ea.onReleased(0, logfileToArchive); + while (tryCounter.get() < 2) // while we have only executed this 0 or 1 times, the file should still be on disk + { + Thread.sleep(100); + assertTrue(logfileToArchive.exists()); + } + + while (!success.get()) + Thread.sleep(100); + + // there will be 3 attempts in total, 2 failing ones, then the successful one: + assertEquals(3, tryCounter.get()); + assertFalse(logfileToArchive.exists()); + File movedFile = new File(moveDir, logfileToArchive.getName()); + assertTrue(movedFile.exists()); + ea.stop(); + } + + + /** + * Makes sure that max retries is honored + * + * 1. create a script that will fail 3 times before actually executing the command + * 2. create an external archiver that retries 2 times (this means that the script will get executed 3 times) + * 3. make sure the file is still on disk and that we have not successfully executed the script + * + */ + @Test + public void testMaxRetries() throws IOException, InterruptedException + { + Pair<String, String> s = createFailingScript(3); + String script = s.left; + String moveDir = s.right; + Path logdirectory = Files.createTempDirectory("logdirectory"); + File logfileToArchive = Files.createTempFile(logdirectory, "logfile", "xyz").toFile(); + Files.write(logfileToArchive.toPath(), "content".getBytes()); + + AtomicInteger tryCounter = new AtomicInteger(); + AtomicBoolean success = new AtomicBoolean(); + ExternalArchiver ea = new ExternalArchiver(script + " %path", null, 1000, 2, (cmd) -> + { + try + { + ExternalArchiver.exec(cmd); + success.set(true); + } + catch (Throwable t) + { + tryCounter.incrementAndGet(); + throw t; + } + }); + ea.onReleased(0, logfileToArchive); + while (tryCounter.get() < 3) + Thread.sleep(500); + assertTrue(logfileToArchive.exists()); + // and the file should not get moved: + Thread.sleep(5000); + assertTrue(logfileToArchive.exists()); + assertFalse(success.get()); + File [] fs = new File(moveDir).listFiles(f -> + { + if (f.getName().startsWith("file.")) + { + f.deleteOnExit(); + return true; + } + throw new AssertionError("There should be no other files in the directory"); + }); + assertEquals(3, fs.length); // maxRetries + the first try + ea.stop(); + } + + + private Pair<String, String> createScript() throws IOException + { + File f = Files.createTempFile("script", "", PosixFilePermissions.asFileAttribute(Sets.newHashSet(PosixFilePermission.OWNER_WRITE, + PosixFilePermission.OWNER_READ, + PosixFilePermission.OWNER_EXECUTE))).toFile(); + f.deleteOnExit(); + File dir = Files.createTempDirectory("archive").toFile(); + dir.deleteOnExit(); + String script = "#!/bin/sh\nmv $1 "+dir.getAbsolutePath(); + Files.write(f.toPath(), script.getBytes()); + return Pair.create(f.getAbsolutePath(), dir.getAbsolutePath()); + } + + private Pair<String, String> createFailingScript(int failures) throws IOException + { + File f = Files.createTempFile("script", "", PosixFilePermissions.asFileAttribute(Sets.newHashSet(PosixFilePermission.OWNER_WRITE, + PosixFilePermission.OWNER_READ, + PosixFilePermission.OWNER_EXECUTE))).toFile(); + f.deleteOnExit(); + File dir = Files.createTempDirectory("archive").toFile(); + dir.deleteOnExit(); + // this script counts files in dir.getAbsolutePath, then if there are more than failures files in there, it moves the actual file + String script = "#!/bin/bash%n" + + "DIR=%s%n" + + "shopt -s nullglob%n" + + "numfiles=($DIR/*)%n" + + "numfiles=${#numfiles[@]}%n" + + "if (( $numfiles < %d )); then%n" + + " mktemp $DIR/file.XXXXX%n" + + " exit 1%n" + + "else%n" + + " mv $1 $DIR%n"+ + "fi%n"; + + Files.write(f.toPath(), String.format(script, dir.getAbsolutePath(), failures).getBytes()); + return Pair.create(f.getAbsolutePath(), dir.getAbsolutePath()); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
