This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c2ecde  check that the WAL directory and log files can sync (#1410)
7c2ecde is described below

commit 7c2ecde0c1001cf7f03fa31841e4c3ac0a8036dc
Author: etseidl <etse...@users.noreply.github.com>
AuthorDate: Wed Nov 6 17:44:18 2019 -0800

    check that the WAL directory and log files can sync (#1410)
    
    check that the WAL directory and log files can sync, both at
    tserver startup and log file creation
---
 .../apache/accumulo/server/fs/VolumeManager.java   |  3 +++
 .../accumulo/server/fs/VolumeManagerImpl.java      | 23 ++++++++++++++++++++++
 .../org/apache/accumulo/tserver/TabletServer.java  | 22 +++++++++++++++++++++
 .../org/apache/accumulo/tserver/log/DfsLogger.java | 12 ++++++++---
 4 files changed, 57 insertions(+), 3 deletions(-)

diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java 
b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java
index 9b5a0f8..0260c9b 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java
@@ -166,6 +166,9 @@ public interface VolumeManager {
   // decide on which of the given locations to create a new file
   String choose(VolumeChooserEnvironment env, String[] options);
 
+  // are sync and flush supported for the given path
+  boolean canSyncAndFlush(Path path);
+
   /**
    * Fetch the default Volume
    */
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
 
b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
index a9ce173..38f0b6a 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Trash;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -496,6 +497,28 @@ public class VolumeManagerImpl implements VolumeManager {
   }
 
   @Override
+  public boolean canSyncAndFlush(Path path) {
+    // the assumption is all filesystems support sync/flush except
+    // for HDFS erasure coding. not checking hdfs config options
+    // since that's already checked in ensureSyncIsEnabled()
+    FileSystem fs = getVolumeByPath(path).getFileSystem();
+    if (fs instanceof DistributedFileSystem) {
+      DistributedFileSystem dfs = (DistributedFileSystem) fs;
+      try {
+        ErasureCodingPolicy currEC = dfs.getErasureCodingPolicy(path);
+        if (currEC != null && !currEC.isReplicationPolicy()) {
+          return false;
+        }
+      } catch (IOException e) {
+        // don't spam warnings...if dir doesn't exist or not EC
+        // we don't really care if the above failed
+        log.debug("exception getting EC policy for " + path, e);
+      }
+    }
+    return true;
+  }
+
+  @Override
   public Volume getDefaultVolume() {
     return defaultVolume;
   }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index ec8c636..ee87c0c 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -162,6 +162,7 @@ import 
org.apache.accumulo.core.util.ratelimit.SharedRateLimiterFactory.RateProv
 import org.apache.accumulo.fate.util.LoggingRunnable;
 import org.apache.accumulo.fate.util.Retry;
 import org.apache.accumulo.fate.util.Retry.RetryFactory;
+import org.apache.accumulo.fate.util.UtilWaitThread;
 import org.apache.accumulo.fate.zookeeper.ZooCache;
 import org.apache.accumulo.fate.zookeeper.ZooLock;
 import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
@@ -171,6 +172,7 @@ import org.apache.accumulo.fate.zookeeper.ZooUtil;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.server.AbstractServer;
 import org.apache.accumulo.server.GarbageCollectionLogger;
+import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.ServerOpts;
 import org.apache.accumulo.server.TabletLevel;
@@ -179,6 +181,8 @@ import 
org.apache.accumulo.server.conf.ServerConfigurationFactory;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.data.ServerMutation;
 import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeChooserEnvironment;
+import org.apache.accumulo.server.fs.VolumeChooserEnvironmentImpl;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManager.FileType;
 import org.apache.accumulo.server.log.SortedLogState;
@@ -372,6 +376,10 @@ public class TabletServer extends AbstractServer {
     final long logBusyTabletsDelay =
         aconf.getTimeInMillis(Property.TSERV_LOG_BUSY_TABLETS_INTERVAL);
 
+    // check early whether the WAL directory supports sync. issue warning if
+    // it doesn't
+    checkWalCanSync(context);
+
     // This thread will calculate and log out the busiest tablets based on 
ingest count and
     // query count every #{logBusiestTabletsDelay}
     if (numBusyTabletsToLog > 0) {
@@ -3002,6 +3010,20 @@ public class TabletServer extends AbstractServer {
     }
   }
 
+  private void checkWalCanSync(ServerContext context) {
+    VolumeChooserEnvironment chooserEnv =
+        new 
VolumeChooserEnvironmentImpl(VolumeChooserEnvironment.ChooserScope.LOGGER, 
context);
+    String logPath = fs.choose(chooserEnv, 
ServerConstants.getBaseUris(context)) + Path.SEPARATOR
+        + ServerConstants.WAL_DIR;
+    if (!fs.canSyncAndFlush(new Path(logPath))) {
+      // sleep a few seconds in case this is at cluster start...give monitor
+      // time to start so the warning will be more visible
+      UtilWaitThread.sleep(5000);
+      log.warn("WAL directory ({}) implementation does not support sync or 
flush."
+          + " Data loss may occur.", logPath);
+    }
+  }
+
   private void config() {
     log.info("Tablet server starting on {}", getHostname());
     majorCompactorThread =
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 46a119d..d5d2031 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -431,17 +431,23 @@ public class DfsLogger implements Comparable<DfsLogger> {
     metaReference = toString();
     LoggerOperation op = null;
     try {
+      Path logfilePath = new Path(logPath);
       short replication = (short) 
conf.getConfiguration().getCount(Property.TSERV_WAL_REPLICATION);
       if (replication == 0)
-        replication = fs.getDefaultReplication(new Path(logPath));
+        replication = fs.getDefaultReplication(logfilePath);
       long blockSize = getWalBlockSize(conf.getConfiguration());
       if (conf.getConfiguration().getBoolean(Property.TSERV_WAL_SYNC))
-        logFile = fs.createSyncable(new Path(logPath), 0, replication, 
blockSize);
+        logFile = fs.createSyncable(logfilePath, 0, replication, blockSize);
       else
-        logFile = fs.create(new Path(logPath), true, 0, replication, 
blockSize);
+        logFile = fs.create(logfilePath, true, 0, replication, blockSize);
       sync = logFile.getClass().getMethod("hsync");
       flush = logFile.getClass().getMethod("hflush");
 
+      // check again that logfile can be sync'd
+      if (!fs.canSyncAndFlush(logfilePath)) {
+        log.warn("sync not supported for log file {}. Data loss may occur.", 
logPath);
+      }
+
       // Initialize the log file with a header and its encryption
       CryptoService cryptoService = context.getCryptoService();
       logFile.write(LOG_FILE_HEADER_V4.getBytes(UTF_8));

Reply via email to