Author: dhruba Date: Tue Oct 9 16:48:46 2007 New Revision: 583323 URL: http://svn.apache.org/viewvc?rev=583323&view=rev Log: HADOOP-1942. Increase the concurrency of transaction logging to edits log. Reduce the number of syncs by double-buffering the changes to the transaction log. (Dhruba Borthakur)
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNodeMetrics.java (with props) lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java (with props) Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=583323&r1=583322&r2=583323&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Tue Oct 9 16:48:46 2007 @@ -362,6 +362,10 @@ HADOOP-1971. Warn when job does not specify a jar. (enis via cutting) + HADOOP-1942. Increase the concurrency of transaction logging to + edits log. Reduce the number of syncs by double-buffering the changes + to the transaction log. (Dhruba Borthakur) + Release 0.14.2 - 2007-10-09 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java?rev=583323&r1=583322&r2=583323&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java Tue Oct 9 16:48:46 2007 @@ -20,6 +20,8 @@ import java.io.BufferedInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataOutput; import java.io.EOFException; import java.io.File; import java.io.FileInputStream; @@ -27,6 +29,7 @@ import java.io.IOException; import java.util.ArrayList; import java.lang.Math; +import java.nio.channels.FileChannel; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.UTF8; @@ -45,33 +48,167 @@ //the following two are used only for backword compatibility : @Deprecated private static final byte OP_DATANODE_ADD = 5; @Deprecated private static final byte OP_DATANODE_REMOVE = 6; + private static int sizeFlushBuffer = 512*1024; private ArrayList<EditLogOutputStream> editStreams = null; private FSImage fsimage = null; - private long lastModificationTime; - private long lastSyncTime; - - static class EditLogOutputStream extends DataOutputStream { + // a monotonically increasing counter that represents transactionIds. + private long txid = 0; + + // stores the last synced transactionId. + private long synctxid = 0; + + // the time of printing the statistics to the log file. + private long lastPrintTime; + + // is a sync currently running? + private boolean isSyncRunning; + + // these are statistics counters. + private long numTransactions; // number of transactions + private long totalTimeTransactions; // total time for all transactions + private NameNodeMetrics metrics; + + private static class TransactionId { + public long txid; + + TransactionId(long value) { + this.txid = value; + } + } + + // stores the most current transactionId of this thread. + private static final ThreadLocal<TransactionId> myTransactionId = new ThreadLocal<TransactionId>() { + protected synchronized TransactionId initialValue() { + return new TransactionId(Long.MAX_VALUE); + } + }; + + static class EditLogOutputStream { + private FileChannel fc; + private FileOutputStream fp; + private DataOutputStream od; + private DataOutputStream od1; + private DataOutputStream od2; + private ByteArrayOutputStream buf1; + private ByteArrayOutputStream buf2; + private int bufSize; + + // these are statistics counters + private long numSync; // number of syncs to disk + private long totalTimeSync; // total time to sync + EditLogOutputStream(File name) throws IOException { - super(new FileOutputStream(name, true)); // open for append + bufSize = sizeFlushBuffer; + buf1 = new ByteArrayOutputStream(bufSize); + buf2 = new ByteArrayOutputStream(bufSize); + od1 = new DataOutputStream(buf1); + od2 = new DataOutputStream(buf2); + od = od1; // start with first buffer + fp = new FileOutputStream(name, true); // open for append + fc = fp.getChannel(); + numSync = totalTimeSync = 0; + } + + // returns the current output stream + DataOutputStream getOutputStream() { + return od; } void flushAndSync() throws IOException { - ((FileOutputStream)out).getChannel().force(true); + this.flush(); + fc.force(true); } void create() throws IOException { - ((FileOutputStream)out).getChannel().truncate(0); - writeInt(FSConstants.LAYOUT_VERSION); + fc.truncate(0); + od.writeInt(FSConstants.LAYOUT_VERSION); flushAndSync(); } + + // flush current buffer + private void flush() throws IOException { + ByteArrayOutputStream buf = getBuffer(); + if (buf.size() == 0) { + return; // no data to flush + } + buf.writeTo(fp); // write data to file + buf.reset(); // erase all data in buf + } + + void close() throws IOException { + // close should have been called after all pending transactions + // have been flushed & synced. + if (getBufSize() != 0) { + throw new IOException("FSEditStream has " + getBufSize() + + " bytes still to be flushed and cannot " + + "closed."); + } + od.close(); + fp.close(); + buf1 = buf2 = null; + od = od1 = od2 = null; + } + + // returns the amount of data in the buffer + int getBufSize() { + return getBuffer().size(); + } + + // get the current buffer + private ByteArrayOutputStream getBuffer() { + if (od == od1) { + return buf1; + } else { + return buf2; + } + } + + // + // Flush current buffer to output stream, swap buffers + // This is protected by the flushLock. + // + void swap() { + if (od == od1) { + od = od2; + } else { + od = od1; + } + } + + // + // Flush old buffer to persistent store + // + void flushAndSyncOld() throws IOException { + numSync++; + ByteArrayOutputStream oldbuf; + if (od == od1) { + oldbuf = buf2; + } else { + oldbuf = buf1; + } + long start = FSNamesystem.now(); + oldbuf.writeTo(fp); // write data to file + oldbuf.reset(); // erase all data in buf + fc.force(true); // sync to persistent store + long end = FSNamesystem.now(); + totalTimeSync += (end - start); + } + + long getTotalSyncTime() { + return totalTimeSync; + } + + long getNumSync() { + return numSync; + } } FSEditLog(FSImage image) { fsimage = image; - lastModificationTime = 0; - lastSyncTime = 0; + isSyncRunning = false; + metrics = NameNode.getNameNodeMetrics(); } private File getEditFile(int idx) { @@ -101,6 +238,7 @@ * @throws IOException */ synchronized void open() throws IOException { + numTransactions = totalTimeTransactions = 0; int size = getNumStorageDirs(); if (editStreams == null) editStreams = new ArrayList<EditLogOutputStream>(size); @@ -138,9 +276,18 @@ * Shutdown the filestore */ synchronized void close() throws IOException { + while (isSyncRunning) { + try { + wait(1000); + } catch (InterruptedException ie) { + } + } if (editStreams == null) { return; } + printStatistics(true); + numTransactions = totalTimeTransactions = 0; + for (int idx = 0; idx < editStreams.size(); idx++) { EditLogOutputStream eStream = editStreams.get(idx); try { @@ -175,6 +322,38 @@ } /** + * The specified streams have IO errors. Remove them from logging + * new transactions. + */ + private void processIOError(ArrayList<EditLogOutputStream> errorStreams) { + if (errorStreams == null) { + return; // nothing to do + } + for (int idx = 0; idx < errorStreams.size(); idx++) { + EditLogOutputStream eStream = errorStreams.get(idx); + int j = 0; + for (j = 0; j < editStreams.size(); j++) { + if (editStreams.get(j) == eStream) { + break; + } + } + if (j == editStreams.size()) { + FSNamesystem.LOG.error("Unable to find sync log on which " + + " IO error occured. " + + "Fatal Error."); + Runtime.getRuntime().exit(-1); + } + try { + processIOError(idx); + } catch (IOException e) { + FSNamesystem.LOG.error("Unable to sync edit log. " + + "Fatal Error."); + Runtime.getRuntime().exit(-1); + } + } + } + + /** * check if ANY edits.new log exists */ boolean existsNew() throws IOException { @@ -425,65 +604,140 @@ */ synchronized void logEdit(byte op, Writable w1, Writable w2) { assert this.getNumEditStreams() > 0 : "no editlog streams"; + long start = FSNamesystem.now(); for (int idx = 0; idx < editStreams.size(); idx++) { - EditLogOutputStream eStream; - synchronized (eStream = editStreams.get(idx)) { + EditLogOutputStream eStream = editStreams.get(idx); + try { + DataOutputStream od = eStream.getOutputStream(); + od.write(op); + if (w1 != null) { + w1.write(od); + } + if (w2 != null) { + w2.write(od); + } + } catch (IOException ie) { try { - eStream.write(op); - if (w1 != null) { - w1.write(eStream); - } - if (w2 != null) { - w2.write(eStream); - } - } catch (IOException ie) { - try { - processIOError(idx); - } catch (IOException e) { - FSNamesystem.LOG.error("Unable to append to edit log. " + - "Fatal Error."); - Runtime.getRuntime().exit(-1); - } + processIOError(idx); + } catch (IOException e) { + FSNamesystem.LOG.error("Unable to append to edit log. " + + "Fatal Error."); + Runtime.getRuntime().exit(-1); } } } + // get a new transactionId + txid++; + // - // record the time when new data was written to the edits log + // record the transactionId when new data was written to the edits log // - lastModificationTime = System.currentTimeMillis(); + TransactionId id = (TransactionId)myTransactionId.get(); + id.txid = txid; + + // update statistics + long end = FSNamesystem.now(); + numTransactions++; + totalTimeTransactions += (end-start); + metrics.incrNumTransactions(1, (int)(end-start)); } // - // flush all data of the Edits log into persistent store + // Sync all modifications done by this thread. // - synchronized void logSync() { - assert this.getNumEditStreams() > 0 : "no editlog streams"; + void logSync() { + ArrayList<EditLogOutputStream> errorStreams = null; + long syncStart = 0; + + // Fetch the transactionId of this thread. + TransactionId id = (TransactionId)myTransactionId.get(); + long mytxid = id.txid; + + synchronized (this) { + assert this.getNumEditStreams() > 0 : "no editlog streams"; + printStatistics(false); - // - // If data was generated before the beginning of the last sync time - // then there is nothing to flush - // - if (lastModificationTime < lastSyncTime) { - return; + // if somebody is already syncing, then wait + while (mytxid > synctxid && isSyncRunning) { + try { + wait(1000); + } catch (InterruptedException ie) { + } + } + + // + // If this transaction was already flushed, then nothing to do + // + if (mytxid <= synctxid) { + return; + } + + // now, this thread will do the sync + syncStart = txid; + isSyncRunning = true; + + // swap buffers + for (int idx = 0; idx < editStreams.size(); idx++) { + EditLogOutputStream eStream = editStreams.get(idx); + eStream.swap(); + } } - lastSyncTime = System.currentTimeMillis(); + // do the sync + long start = FSNamesystem.now(); for (int idx = 0; idx < editStreams.size(); idx++) { - EditLogOutputStream eStream; - synchronized (eStream = editStreams.get(idx)) { - try { - eStream.flushAndSync(); - } catch (IOException ie) { - try { - processIOError(idx); - } catch (IOException e) { - FSNamesystem.LOG.error("Unable to sync edit log. " + - "Fatal Error."); - Runtime.getRuntime().exit(-1); - } + EditLogOutputStream eStream = editStreams.get(idx); + try { + eStream.flushAndSyncOld(); + } catch (IOException ie) { + // + // remember the streams that encountered an error. + // + if (errorStreams == null) { + errorStreams = new ArrayList<EditLogOutputStream>(1); } + errorStreams.add(eStream); + FSNamesystem.LOG.error("Unable to sync edit log. " + + "Fatal Error."); } } + long elapsed = FSNamesystem.now() - start; + + synchronized (this) { + processIOError(errorStreams); + synctxid = syncStart; + isSyncRunning = false; + this.notifyAll(); + } + + metrics.incrSyncs(1, (int)elapsed); + } + + // + // print statistics every 1 minute. + // + private void printStatistics(boolean force) { + long now = FSNamesystem.now(); + if (lastPrintTime + 60000 < now && !force) { + return; + } + if (editStreams == null) { + return; + } + lastPrintTime = now; + StringBuffer buf = new StringBuffer(); + + buf.append("Number of transactions: " + numTransactions + + " Total time for transactions(ms): " + + totalTimeTransactions); + buf.append(" Number of syncs: " + editStreams.get(0).getNumSync()); + buf.append(" SyncTimes(ms): "); + for (int idx = 0; idx < editStreams.size(); idx++) { + EditLogOutputStream eStream = editStreams.get(idx); + buf.append(eStream.getTotalSyncTime()); + buf.append(" "); + } + FSNamesystem.LOG.info(buf); } /** @@ -561,10 +815,10 @@ assert(getNumStorageDirs() == editStreams.size()); long size = 0; for (int idx = 0; idx < getNumStorageDirs(); idx++) { - synchronized (editStreams.get(idx)) { - assert(size == 0 || size == getEditFile(idx).length()); - size = getEditFile(idx).length(); - } + EditLogOutputStream eStream = editStreams.get(idx); + assert(size == 0 || + size == getEditFile(idx).length() + eStream.getBufSize()); + size = getEditFile(idx).length() + eStream.getBufSize(); } return size; } @@ -653,5 +907,10 @@ */ synchronized long getFsEditTime() throws IOException { return getEditFile(0).lastModified(); + } + + // sets the initial capacity of the flush buffer. + static void setBufferCapacity(int size) { + sizeFlushBuffer = size; } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?rev=583323&r1=583322&r2=583323&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Tue Oct 9 16:48:46 2007 @@ -31,12 +31,6 @@ import java.util.Collection; import java.util.Iterator; -import org.apache.hadoop.metrics.MetricsRecord; -import org.apache.hadoop.metrics.MetricsUtil; -import org.apache.hadoop.metrics.MetricsContext; -import org.apache.hadoop.metrics.Updater; -import org.apache.hadoop.metrics.jvm.JvmMetrics; - /********************************************************** * NameNode serves as both directory namespace manager and * "inode table" for the Hadoop DFS. There is a single NameNode @@ -105,62 +99,11 @@ format(conf, false); } - private static class NameNodeMetrics implements Updater { - private final MetricsRecord metricsRecord; - - private int numFilesCreated = 0; - private int numFilesOpened = 0; - private int numFilesRenamed = 0; - private int numFilesListed = 0; - - NameNodeMetrics(Configuration conf) { - String sessionId = conf.get("session.id"); - // Initiate Java VM metrics - JvmMetrics.init("NameNode", sessionId); - // Create a record for NameNode metrics - MetricsContext metricsContext = MetricsUtil.getContext("dfs"); - metricsRecord = MetricsUtil.createRecord(metricsContext, "namenode"); - metricsRecord.setTag("sessionId", sessionId); - metricsContext.registerUpdater(this); - } - - /** - * Since this object is a registered updater, this method will be called - * periodically, e.g. every 5 seconds. - */ - public void doUpdates(MetricsContext unused) { - synchronized (this) { - metricsRecord.incrMetric("files_created", numFilesCreated); - metricsRecord.incrMetric("files_opened", numFilesOpened); - metricsRecord.incrMetric("files_renamed", numFilesRenamed); - metricsRecord.incrMetric("files_listed", numFilesListed); - - numFilesCreated = 0; - numFilesOpened = 0; - numFilesRenamed = 0; - numFilesListed = 0; - } - metricsRecord.update(); - } - - synchronized void createFile() { - ++numFilesCreated; - } - - synchronized void openFile() { - ++numFilesOpened; - } - - synchronized void renameFile() { - ++numFilesRenamed; - } - - synchronized void listFile(int nfiles) { - numFilesListed += nfiles; - } + static NameNodeMetrics myMetrics; + + public static NameNodeMetrics getNameNodeMetrics() { + return myMetrics; } - - private NameNodeMetrics myMetrics; /** * Initialize the server Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNodeMetrics.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNodeMetrics.java?rev=583323&view=auto ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNodeMetrics.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNodeMetrics.java Tue Oct 9 16:48:46 2007 @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.dfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics.MetricsRecord; +import org.apache.hadoop.metrics.MetricsUtil; +import org.apache.hadoop.metrics.MetricsContext; +import org.apache.hadoop.metrics.Updater; +import org.apache.hadoop.metrics.jvm.JvmMetrics; + +class NameNodeMetrics implements Updater { + private final MetricsRecord metricsRecord; + + private int numFilesCreated = 0; + private int numFilesOpened = 0; + private int numFilesRenamed = 0; + private int numFilesListed = 0; + + private int numTransactions = 0; + private int totalTimeTransactionsLogMemory = 0; + private int numSyncs = 0; + private int totalTimeSyncs = 0; + + NameNodeMetrics(Configuration conf) { + String sessionId = conf.get("session.id"); + // Initiate Java VM metrics + JvmMetrics.init("NameNode", sessionId); + // Create a record for NameNode metrics + MetricsContext metricsContext = MetricsUtil.getContext("dfs"); + metricsRecord = MetricsUtil.createRecord(metricsContext, "namenode"); + metricsRecord.setTag("sessionId", sessionId); + metricsContext.registerUpdater(this); + } + + /** + * Since this object is a registered updater, this method will be called + * periodically, e.g. every 5 seconds. + */ + public void doUpdates(MetricsContext unused) { + synchronized (this) { + metricsRecord.incrMetric("files_created", numFilesCreated); + metricsRecord.incrMetric("files_opened", numFilesOpened); + metricsRecord.incrMetric("files_renamed", numFilesRenamed); + metricsRecord.incrMetric("files_listed", numFilesListed); + metricsRecord.incrMetric("num_transactions", numTransactions); + metricsRecord.incrMetric("avg_time_transactions_memory", + getAverageTimeTransaction()); + metricsRecord.incrMetric("num_syncs", numSyncs); + metricsRecord.incrMetric("avg_time_transactions_sync", + getAverageTimeSync()); + + numFilesCreated = 0; + numFilesOpened = 0; + numFilesRenamed = 0; + numFilesListed = 0; + numTransactions = 0; + totalTimeTransactionsLogMemory = 0; + numSyncs = 0; + totalTimeSyncs = 0; + } + metricsRecord.update(); + } + + synchronized void createFile() { + ++numFilesCreated; + } + + synchronized void openFile() { + ++numFilesOpened; + } + + synchronized void renameFile() { + ++numFilesRenamed; + } + + synchronized void listFile(int nfiles) { + numFilesListed += nfiles; + } + + synchronized void incrNumTransactions(int count, int time) { + numTransactions += count; + totalTimeTransactionsLogMemory += time; + } + + synchronized void incrSyncs(int count, int time) { + numSyncs += count; + totalTimeSyncs += time; + } + + synchronized private int getAverageTimeTransaction() { + if (numTransactions == 0) { + return 0; + } + return totalTimeTransactionsLogMemory/numTransactions; + } + + synchronized private int getAverageTimeSync() { + if (numSyncs == 0) { + return 0; + } + return totalTimeSyncs/numSyncs; + } +} Propchange: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNodeMetrics.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNodeMetrics.java ------------------------------------------------------------------------------ svn:keywords = Id Revision HeadURL Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java?rev=583323&view=auto ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java Tue Oct 9 16:48:46 2007 @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.dfs; + +import junit.framework.TestCase; +import java.io.*; +import java.util.Collection; +import java.util.Iterator; +import java.util.Random; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; + +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.UTF8; +import org.apache.hadoop.io.Writable; + + +/** + * This class tests the creation and validation of a checkpoint. + */ +public class TestEditLog extends TestCase { + static final int numDatanodes = 1; + + // This test creates numThreads threads and each thread does + // numberTransactions Transactions concurrently. + int numberTransactions = 1000; + int numThreads = 100; + + // + // an object that does a bunch of transactions + // + static class Transactions implements Runnable { + FSEditLog editLog; + int numTransactions; + short replication = 3; + long blockSize = 64; + + Transactions(FSEditLog editlog, int num) { + editLog = editlog; + numTransactions = num; + } + + // add a bunch of transactions. + public void run() { + for (int i = 0; i < numTransactions; i++) { + INodeFile inode = new INodeFile(0, replication, 0, blockSize); + editLog.logCreateFile("/filename" + i, inode); + editLog.logSync(); + } + } + } + + /** + * Tests transaction logging in dfs. + */ + public void testEditLog() throws IOException { + + // start a cluster + + Collection<File> namedirs = null; + Configuration conf = new Configuration(); + MiniDFSCluster cluster = new MiniDFSCluster(0, conf, numDatanodes, + true, true, null, null); + cluster.waitActive(); + FileSystem fileSys = cluster.getFileSystem(); + int numdirs = 0; + + try { + namedirs = cluster.getNameDirs(); + } finally { + fileSys.close(); + cluster.shutdown(); + } + + for (Iterator it = namedirs.iterator(); it.hasNext(); ) { + File dir = (File)it.next(); + System.out.println(dir); + numdirs++; + } + + FSImage fsimage = new FSImage(namedirs); + FSEditLog editLog = fsimage.getEditLog(); + + // set small size of flush buffer + editLog.setBufferCapacity(2048); + editLog.close(); + editLog.open(); + + // Create threads and make them run transactions concurrently. + Thread threadId[] = new Thread[numThreads]; + for (int i = 0; i < numThreads; i++) { + Transactions trans = new Transactions(editLog, numberTransactions); + threadId[i] = new Thread(trans, "TransactionThread-" + i); + threadId[i].start(); + } + + // wait for all transactions to get over + for (int i = 0; i < numThreads; i++) { + try { + threadId[i].join(); + } catch (InterruptedException e) { + i--; // retry + } + } + + editLog.close(); + + // Verify that we can read in all the transactions that we have written. + // If there were any corruptions, it is likely that the reading in + // of these transactions will throw an exception. + // + for (int i = 0; i < numdirs; i++) { + File editFile = fsimage.getEditFile(i); + System.out.println("Verifying file: " + editFile); + int numEdits = editLog.loadFSEdits(editFile); + assertTrue("Verification for " + editFile + " failed. " + + "Expected " + (numThreads * numberTransactions) + " transactions. "+ + "Found " + numEdits + " transactions.", + numEdits == numThreads * numberTransactions); + + } + } +} Propchange: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java ------------------------------------------------------------------------------ svn:keywords = Id Revision HeadURL