Author: liyin Date: Thu Apr 24 18:18:29 2014 New Revision: 1589817 URL: http://svn.apache.org/r1589817 Log: [master] Fail split task when split file cannot be closed
Author: fan Summary: With some minor code cleanup Test Plan: new unit test testUnableToCloseSplitFile Reviewers: jiqingt, liyintang Reviewed By: jiqingt CC: hbase-eng@ Differential Revision: https://phabricator.fb.com/D1290752 Task ID: 4185227 Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java?rev=1589817&r1=1589816&r2=1589817&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java Thu Apr 24 18:18:29 2014 @@ -83,12 +83,12 @@ public class SplitLogWorker implements R private final TaskExecutor executor; private long zkretries; - private Object taskReadyLock = new Object(); + private final Object taskReadyLock = new Object(); volatile int taskReadySeq = 0; private volatile String currentTask = null; private int currentVersion; private volatile boolean exitWorker; - private Object grabTaskLock = new Object(); + private final Object grabTaskLock = new Object(); private boolean workerInGrabTask = false; protected ZooKeeperWrapper watcher; private static int numWorkers = 0; @@ -156,12 +156,11 @@ public class SplitLogWorker implements R } String tmpname = ZKSplitLog.getSplitLogDirTmpComponent( workerName, filename); - if (HLogSplitter.splitLogFileToTemp(rootdir, tmpname, - st, fs, conf, p, logCloseThreadPool, masterRef.get()) == false) { + if (!HLogSplitter.splitLogFileToTemp(rootdir, tmpname, + st, fs, conf, p, logCloseThreadPool, masterRef.get())) { t1 = System.currentTimeMillis(); timingInfo.append("splitLogFileToTemp took " + (t1-t0) + " ms. "); - t0 = t1; return Status.PREEMPTED; } @@ -365,7 +364,6 @@ public class SplitLogWorker implements R Thread.interrupted(); } } - return; } /** @@ -437,7 +435,6 @@ public class SplitLogWorker implements R LOG.warn("failed to end task, " + path + " " + ts, e); } tot_wkr_final_transistion_failed.incrementAndGet(); - return; } void getDataSetWatchAsync() { @@ -570,7 +567,6 @@ public class SplitLogWorker implements R worker = new Thread(null, this, "SplitLogWorker-" + workerName); exitWorker = false; worker.start(); - return; } /** @@ -598,7 +594,6 @@ public class SplitLogWorker implements R return; } getDataSetWatchSuccess(path, newData); - return; } } Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1589817&r1=1589816&r2=1589817&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Thu Apr 24 18:18:29 2014 @@ -19,27 +19,7 @@ */ package org.apache.hadoop.hbase.regionserver.wal; -import static org.apache.hadoop.hbase.util.FSUtils.recoverFileLease; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.net.ConnectException; -import java.text.ParseException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.atomic.AtomicReference; - +import com.google.common.base.Preconditions; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -48,7 +28,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.RemoteExceptionHandler; -import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.ipc.HMasterRegionInterface; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; @@ -61,7 +40,24 @@ import org.apache.hadoop.hbase.util.Envi import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; -import com.google.common.base.Preconditions; +import java.io.EOFException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.ConnectException; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.hadoop.hbase.util.FSUtils.recoverFileLease; /** * This class is responsible for splitting up a bunch of regionserver commit log @@ -97,48 +93,6 @@ public class HLogSplitter { private MonitoredTask status; - - /** - * Create a new HLogSplitter using the given {@link Configuration} and the - * <code>hbase.hlog.splitter.impl</code> property to derived the instance - * class to use. - * - * @param rootDir hbase directory - * @param srcDir logs directory - * @param oldLogDir directory where processed logs are archived to - * @param logfiles the list of log files to split - */ - public static HLogSplitter createLogSplitter(Configuration conf, - final Path rootDir, final Path srcDir, - Path oldLogDir, final FileSystem fs) { - - @SuppressWarnings("unchecked") - Class<? extends HLogSplitter> splitterClass = (Class<? extends HLogSplitter>) conf - .getClass(LOG_SPLITTER_IMPL, HLogSplitter.class); - try { - Constructor<? extends HLogSplitter> constructor = - splitterClass.getConstructor( - Configuration.class, // conf - Path.class, // rootDir - Path.class, // srcDir - Path.class, // oldLogDir - FileSystem.class); // fs - return constructor.newInstance(conf, rootDir, srcDir, oldLogDir, fs); - } catch (IllegalArgumentException e) { - throw new RuntimeException(e); - } catch (InstantiationException e) { - throw new RuntimeException(e); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } catch (InvocationTargetException e) { - throw new RuntimeException(e); - } catch (SecurityException e) { - throw new RuntimeException(e); - } catch (NoSuchMethodException e) { - throw new RuntimeException(e); - } - } - public HLogSplitter(Configuration conf, Path rootDir, Path srcDir, Path oldLogDir, FileSystem fs, ExecutorService logCloseThreadPool, HMasterRegionInterface master) { @@ -320,7 +274,7 @@ public class HLogSplitter { t0 = System.currentTimeMillis(); int n = 0; - List<Future<Void>> closeResults = new ArrayList<Future<Void>>(); + List<Future<Void>> closeResults = new ArrayList<>(); for (Object o : logWriters.values()) { long t2 = EnvironmentEdgeManager.currentTimeMillis(); if ((t2 - last_report_at) > period) { @@ -340,12 +294,13 @@ public class HLogSplitter { Future<Void> closeResult = logCloseThreadPool.submit(new Callable<Void>() { @Override - public Void call() { + public Void call() throws IOException { try { wap.w.close(); } catch (IOException ioe) { LOG.warn("Failed to close recovered edits writer " + wap.p, ioe); + throw ioe; } LOG.debug("Closed " + wap.p); return null; @@ -394,10 +349,7 @@ public class HLogSplitter { LOG.debug(timingInfo); status.markComplete(msg); } - if (progress_failed) { - return false; - } - return true; + return !progress_failed; } /** @@ -488,7 +440,7 @@ public class HLogSplitter { * @param conf * @return A new Reader instance * @throws IOException - * @throws CorruptedLogFile + * @throws CorruptedLogFileException */ protected Reader getReader(FileSystem fs, FileStatus file, Configuration conf, boolean skipErrors) Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java?rev=1589817&r1=1589816&r2=1589817&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java Thu Apr 24 18:18:29 2014 @@ -26,12 +26,23 @@ import org.apache.hadoop.hbase.util.Byte public class InstrumentedSequenceFileLogWriter extends SequenceFileLogWriter { public static boolean activateFailure = false; + public static boolean activateCloseIOE = false; + + @Override + public void append(HLog.Entry entry) throws IOException { + super.append(entry); + if (activateFailure && Bytes.equals(entry.getKey().getRegionName(), "break".getBytes())) { + System.out.println(getClass().getName() + ": I will throw an exception now..."); + throw(new IOException("This exception is instrumented and should only be thrown for testing")); + } + } + @Override - public void append(HLog.Entry entry) throws IOException { - super.append(entry); - if (activateFailure && Bytes.equals(entry.getKey().getRegionName(), "break".getBytes())) { - System.out.println(getClass().getName() + ": I will throw an exception now..."); - throw(new IOException("This exception is instrumented and should only be thrown for testing")); - } + public void close() throws IOException { + if (activateCloseIOE) { + throw new IOException("Instrumented IOException when closing the writer"); + } else { + super.close(); } + } } Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java?rev=1589817&r1=1589816&r2=1589817&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java Thu Apr 24 18:18:29 2014 @@ -19,19 +19,6 @@ */ package org.apache.hadoop.hbase.regionserver.wal; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -58,6 +45,19 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + /** * */ @@ -820,6 +820,43 @@ public class TestHLogSplit { assertEquals(1, fs.listStatus(corruptDir).length); } + @Test + public void testUnableToCloseSplitFile() throws IOException { + regions.clear(); + String region = "testUnableToCloseSplitFile"; + regions.add(region); + generateHLogs(-1); + + fs.initialize(fs.getUri(), conf); + FileStatus logfile = fs.listStatus(hlogDir)[0]; + InstrumentedSequenceFileLogWriter.activateCloseIOE = true; + // Log split should fail because writer cannot be closed + boolean hasIOE = false; + try { + HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, conf, + reporter, logCloseThreadPool, null); + } catch (IOException ioe) { + LOG.debug("Expected IOE because the log cannot be closed"); + hasIOE = true; + } + assertTrue(hasIOE); + + InstrumentedSequenceFileLogWriter.activateCloseIOE = false; + // Log split succeeds this time + assertTrue(HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, conf, + reporter, logCloseThreadPool, null)); + + HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir, + logfile.getPath().toString(), conf); + + Path originalLog = (fs.listStatus(oldLogDir))[0].getPath(); + originalLog = (fs.listStatus(originalLog))[0].getPath(); + Path splitLog = getLogForRegion(hbaseDir, TABLE_NAME, region); + + LOG.debug("Original log path " + originalLog + " , split log path " + splitLog); + assertTrue(logsAreEqual(originalLog, splitLog)); + } + private void flushToConsole(String s) { System.out.println(s); System.out.flush();
