This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push: new 7c2ecde check that the WAL directory and log files can sync (#1410) 7c2ecde is described below commit 7c2ecde0c1001cf7f03fa31841e4c3ac0a8036dc Author: etseidl <etse...@users.noreply.github.com> AuthorDate: Wed Nov 6 17:44:18 2019 -0800 check that the WAL directory and log files can sync (#1410) check that the WAL directory and log files can sync, both at tserver startup and log file creation --- .../apache/accumulo/server/fs/VolumeManager.java | 3 +++ .../accumulo/server/fs/VolumeManagerImpl.java | 23 ++++++++++++++++++++++ .../org/apache/accumulo/tserver/TabletServer.java | 22 +++++++++++++++++++++ .../org/apache/accumulo/tserver/log/DfsLogger.java | 12 ++++++++--- 4 files changed, 57 insertions(+), 3 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java index 9b5a0f8..0260c9b 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java @@ -166,6 +166,9 @@ public interface VolumeManager { // decide on which of the given locations to create a new file String choose(VolumeChooserEnvironment env, String[] options); + // are sync and flush supported for the given path + boolean canSyncAndFlush(Path path); + /** * Fetch the default Volume */ diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java index a9ce173..38f0b6a 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java @@ -52,6 +52,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Trash; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -496,6 +497,28 @@ public class VolumeManagerImpl implements VolumeManager { } @Override + public boolean canSyncAndFlush(Path path) { + // the assumption is all filesystems support sync/flush except + // for HDFS erasure coding. not checking hdfs config options + // since that's already checked in ensureSyncIsEnabled() + FileSystem fs = getVolumeByPath(path).getFileSystem(); + if (fs instanceof DistributedFileSystem) { + DistributedFileSystem dfs = (DistributedFileSystem) fs; + try { + ErasureCodingPolicy currEC = dfs.getErasureCodingPolicy(path); + if (currEC != null && !currEC.isReplicationPolicy()) { + return false; + } + } catch (IOException e) { + // don't spam warnings...if dir doesn't exist or not EC + // we don't really care if the above failed + log.debug("exception getting EC policy for " + path, e); + } + } + return true; + } + + @Override public Volume getDefaultVolume() { return defaultVolume; } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index ec8c636..ee87c0c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -162,6 +162,7 @@ import org.apache.accumulo.core.util.ratelimit.SharedRateLimiterFactory.RateProv import org.apache.accumulo.fate.util.LoggingRunnable; import org.apache.accumulo.fate.util.Retry; import org.apache.accumulo.fate.util.Retry.RetryFactory; +import org.apache.accumulo.fate.util.UtilWaitThread; import org.apache.accumulo.fate.zookeeper.ZooCache; import org.apache.accumulo.fate.zookeeper.ZooLock; import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason; @@ -171,6 +172,7 @@ import org.apache.accumulo.fate.zookeeper.ZooUtil; import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.server.AbstractServer; import org.apache.accumulo.server.GarbageCollectionLogger; +import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.ServerOpts; import org.apache.accumulo.server.TabletLevel; @@ -179,6 +181,8 @@ import org.apache.accumulo.server.conf.ServerConfigurationFactory; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.data.ServerMutation; import org.apache.accumulo.server.fs.FileRef; +import org.apache.accumulo.server.fs.VolumeChooserEnvironment; +import org.apache.accumulo.server.fs.VolumeChooserEnvironmentImpl; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.fs.VolumeManager.FileType; import org.apache.accumulo.server.log.SortedLogState; @@ -372,6 +376,10 @@ public class TabletServer extends AbstractServer { final long logBusyTabletsDelay = aconf.getTimeInMillis(Property.TSERV_LOG_BUSY_TABLETS_INTERVAL); + // check early whether the WAL directory supports sync. issue warning if + // it doesn't + checkWalCanSync(context); + // This thread will calculate and log out the busiest tablets based on ingest count and // query count every #{logBusiestTabletsDelay} if (numBusyTabletsToLog > 0) { @@ -3002,6 +3010,20 @@ public class TabletServer extends AbstractServer { } } + private void checkWalCanSync(ServerContext context) { + VolumeChooserEnvironment chooserEnv = + new VolumeChooserEnvironmentImpl(VolumeChooserEnvironment.ChooserScope.LOGGER, context); + String logPath = fs.choose(chooserEnv, ServerConstants.getBaseUris(context)) + Path.SEPARATOR + + ServerConstants.WAL_DIR; + if (!fs.canSyncAndFlush(new Path(logPath))) { + // sleep a few seconds in case this is at cluster start...give monitor + // time to start so the warning will be more visible + UtilWaitThread.sleep(5000); + log.warn("WAL directory ({}) implementation does not support sync or flush." + + " Data loss may occur.", logPath); + } + } + private void config() { log.info("Tablet server starting on {}", getHostname()); majorCompactorThread = diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java index 46a119d..d5d2031 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java @@ -431,17 +431,23 @@ public class DfsLogger implements Comparable<DfsLogger> { metaReference = toString(); LoggerOperation op = null; try { + Path logfilePath = new Path(logPath); short replication = (short) conf.getConfiguration().getCount(Property.TSERV_WAL_REPLICATION); if (replication == 0) - replication = fs.getDefaultReplication(new Path(logPath)); + replication = fs.getDefaultReplication(logfilePath); long blockSize = getWalBlockSize(conf.getConfiguration()); if (conf.getConfiguration().getBoolean(Property.TSERV_WAL_SYNC)) - logFile = fs.createSyncable(new Path(logPath), 0, replication, blockSize); + logFile = fs.createSyncable(logfilePath, 0, replication, blockSize); else - logFile = fs.create(new Path(logPath), true, 0, replication, blockSize); + logFile = fs.create(logfilePath, true, 0, replication, blockSize); sync = logFile.getClass().getMethod("hsync"); flush = logFile.getClass().getMethod("hflush"); + // check again that logfile can be sync'd + if (!fs.canSyncAndFlush(logfilePath)) { + log.warn("sync not supported for log file {}. Data loss may occur.", logPath); + } + // Initialize the log file with a header and its encryption CryptoService cryptoService = context.getCryptoService(); logFile.write(LOG_FILE_HEADER_V4.getBytes(UTF_8));