Sure. Makes sense. On Tue, Aug 23, 2011 at 6:00 AM, Jonathan Hsieh <[email protected]> wrote:
> this was something I wanted to rename -- maybe make it > org.apache.flume.wal? > > Jon. > > On Mon, Aug 22, 2011 at 3:04 PM, <[email protected]> wrote: > > > Author: esammer > > Date: Mon Aug 22 22:04:00 2011 > > New Revision: 1160466 > > > > URL: http://svn.apache.org/viewvc?rev=1160466&view=rev > > Log: > > - Skeleton of a file-based WAL implementation. Still fuzzy. > > > > Added: > > > > > > incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/ > > > > > > incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java > > > > > > incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java > > > > Added: > > > incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java > > URL: > > > http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java?rev=1160466&view=auto > > > > > ============================================================================== > > --- > > > incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java > > (added) > > +++ > > > incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java > > Mon Aug 22 22:04:00 2011 > > @@ -0,0 +1,129 @@ > > +package org.apache.flume.durability; > > + > > +import java.io.File; > > +import java.io.IOException; > > + > > +import > org.apache.flume.durability.FileBasedWALManager.FileBasedWALWriter; > > +import org.apache.flume.formatter.output.EventFormatter; > > +import org.apache.flume.formatter.output.TextDelimitedOutputFormatter; > > +import org.slf4j.Logger; > > +import org.slf4j.LoggerFactory; > > + > > +import com.google.common.base.Preconditions; > > + > > +public class FileBasedWAL { > > + > > + private static final Logger logger = LoggerFactory > > + .getLogger(FileBasedWAL.class); > > + > > + private File baseDirectory; > > + private EventFormatter formatter; > > + > > + private File openDirectory; > > + private File pendingDirectory; > > + private File sentDirectory; > > + private File completeDirectory; > > + private boolean isInitialized; > > + > > + public FileBasedWAL(File baseDirectory) { > > + Preconditions.checkNotNull(baseDirectory, > > + "WAL base directory may not be null"); > > + > > + this.baseDirectory = baseDirectory; > > + > > + openDirectory = new File(baseDirectory, "open"); > > + pendingDirectory = new File(baseDirectory, "pending"); > > + sentDirectory = new File(baseDirectory, "sent"); > > + completeDirectory = new File(baseDirectory, "complete"); > > + > > + /* FIXME: This shouldn't be hardcoded (and shouldn't be text!). */ > > + formatter = new TextDelimitedOutputFormatter(); > > + > > + isInitialized = false; > > + } > > + > > + public void initialize() throws IOException { > > + logger.info("Initializing file-based WAL at {}", baseDirectory); > > + > > + /* > > + * NB: We purposefully check pathological (hey, that's a pun!) error > > cases > > + * to improve error messages. Resist the urge to condense these > > checks. > > + * Resist the urge to just use mkdirs(); it could potentially expose > > + * sensitive data (i.e. path creation / permission setting races). > > + */ > > + File parentDirectory = baseDirectory.getParentFile(); > > + > > + Preconditions.checkState(parentDirectory.exists(), "WAL parent > > directory (" > > + + parentDirectory + ") does not exist"); > > + Preconditions.checkState(parentDirectory.isDirectory(), "WAL parent > (" > > + + parentDirectory + " ) is not a directory"); > > + > > + if (!baseDirectory.exists()) { > > + if (!baseDirectory.mkdir()) { > > + throw new IOException("Unable to create WAL base directory " > > + + baseDirectory); > > + } > > + } else { > > + Preconditions.checkState(baseDirectory.isDirectory(), > > + "WAL base directory " + baseDirectory > > + + " exists but it isn't a directory"); > > + } > > + > > + openDirectory.mkdir(); > > + Preconditions.checkState(openDirectory.exists(), > > + "Directory doesn't exist: %s", openDirectory); > > + > > + pendingDirectory.mkdir(); > > + Preconditions.checkState(pendingDirectory.exists(), > > + "Directory doesn't exist: %s", pendingDirectory); > > + > > + sentDirectory.mkdir(); > > + Preconditions.checkState(sentDirectory.exists(), > > + "Directory doesn't exist: %s", sentDirectory); > > + > > + completeDirectory.mkdir(); > > + Preconditions.checkState(completeDirectory.exists(), > > + "Directory doesn't exist: %s", completeDirectory); > > + } > > + > > + public FileBasedWALWriter getWriter() throws IOException { > > + FileBasedWALWriter writer = new FileBasedWALWriter(); > > + > > + writer.setFormatter(formatter); > > + > > + if (!isInitialized) { > > + initialize(); > > + } > > + > > + return writer; > > + } > > + > > + @Override > > + public String toString() { > > + return "{ baseDirectory:" + baseDirectory + " openDirectory:" > > + + openDirectory + " pendingDirectory:" + pendingDirectory > > + + " sentDirectory:" + sentDirectory + " completeDirectory:" > > + + completeDirectory + " isInitialized:" + isInitialized + " }"; > > + } > > + > > + public File getBaseDirectory() { > > + return baseDirectory; > > + } > > + > > + public File getOpenDirectory() { > > + return openDirectory; > > + } > > + > > + public File getPendingDirectory() { > > + return pendingDirectory; > > + } > > + > > + public File getSentDirectory() { > > + return sentDirectory; > > + } > > + > > + public File getCompleteDirectory() { > > + return completeDirectory; > > + } > > + > > +} > > > > Added: > > > incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java > > URL: > > > http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java?rev=1160466&view=auto > > > > > ============================================================================== > > --- > > > incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java > > (added) > > +++ > > > incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java > > Mon Aug 22 22:04:00 2011 > > @@ -0,0 +1,76 @@ > > +package org.apache.flume.durability; > > + > > +import java.io.BufferedOutputStream; > > +import java.io.File; > > +import java.io.FileNotFoundException; > > +import java.io.FileOutputStream; > > +import java.io.IOException; > > + > > +import org.apache.flume.Event; > > +import org.apache.flume.formatter.output.EventFormatter; > > + > > +public class FileBasedWALManager { > > + > > + private File directory; > > + > > + public FileBasedWAL getWAL(String name) { > > + File walDirectory = new File(directory, name); > > + FileBasedWAL wal = new FileBasedWAL(walDirectory); > > + > > + return wal; > > + } > > + > > + @Override > > + public String toString() { > > + return "{ directory:" + directory + " }"; > > + } > > + > > + public File getDirectory() { > > + return directory; > > + } > > + > > + public void setDirectory(File directory) { > > + this.directory = directory; > > + } > > + > > + public static class FileBasedWALWriter { > > + > > + private File file; > > + private BufferedOutputStream outputStream; > > + private EventFormatter formatter; > > + > > + public void open() throws FileNotFoundException { > > + outputStream = new BufferedOutputStream(new > FileOutputStream(file)); > > + } > > + > > + public void write(Event event) throws IOException { > > + outputStream.write(formatter.format(event)); > > + } > > + > > + public void close() throws IOException { > > + outputStream.close(); > > + } > > + > > + public void flush() throws IOException { > > + outputStream.flush(); > > + } > > + > > + public File getFile() { > > + return file; > > + } > > + > > + public void setFile(File file) { > > + this.file = file; > > + } > > + > > + public EventFormatter getFormatter() { > > + return formatter; > > + } > > + > > + public void setFormatter(EventFormatter formatter) { > > + this.formatter = formatter; > > + } > > + > > + } > > + > > +} > > > > > > > > > -- > // Jonathan Hsieh (shay) > // Software Engineer, Cloudera > // [email protected] > -- Eric Sammer twitter: esammer data: www.cloudera.com
