this was something I wanted to rename -- maybe make it org.apache.flume.wal?
Jon. On Mon, Aug 22, 2011 at 3:04 PM, <[email protected]> wrote: > Author: esammer > Date: Mon Aug 22 22:04:00 2011 > New Revision: 1160466 > > URL: http://svn.apache.org/viewvc?rev=1160466&view=rev > Log: > - Skeleton of a file-based WAL implementation. Still fuzzy. > > Added: > > > incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/ > > > incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java > > > incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java > > Added: > incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java > URL: > http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java?rev=1160466&view=auto > > ============================================================================== > --- > incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java > (added) > +++ > incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java > Mon Aug 22 22:04:00 2011 > @@ -0,0 +1,129 @@ > +package org.apache.flume.durability; > + > +import java.io.File; > +import java.io.IOException; > + > +import org.apache.flume.durability.FileBasedWALManager.FileBasedWALWriter; > +import org.apache.flume.formatter.output.EventFormatter; > +import org.apache.flume.formatter.output.TextDelimitedOutputFormatter; > +import org.slf4j.Logger; > +import org.slf4j.LoggerFactory; > + > +import com.google.common.base.Preconditions; > + > +public class FileBasedWAL { > + > + private static final Logger logger = LoggerFactory > + .getLogger(FileBasedWAL.class); > + > + private File baseDirectory; > + private EventFormatter formatter; > + > + private File openDirectory; > + private File pendingDirectory; > + private File sentDirectory; > + private File completeDirectory; > + private boolean isInitialized; > + > + public FileBasedWAL(File baseDirectory) { > + Preconditions.checkNotNull(baseDirectory, > + "WAL base directory may not be null"); > + > + this.baseDirectory = baseDirectory; > + > + openDirectory = new File(baseDirectory, "open"); > + pendingDirectory = new File(baseDirectory, "pending"); > + sentDirectory = new File(baseDirectory, "sent"); > + completeDirectory = new File(baseDirectory, "complete"); > + > + /* FIXME: This shouldn't be hardcoded (and shouldn't be text!). */ > + formatter = new TextDelimitedOutputFormatter(); > + > + isInitialized = false; > + } > + > + public void initialize() throws IOException { > + logger.info("Initializing file-based WAL at {}", baseDirectory); > + > + /* > + * NB: We purposefully check pathological (hey, that's a pun!) error > cases > + * to improve error messages. Resist the urge to condense these > checks. > + * Resist the urge to just use mkdirs(); it could potentially expose > + * sensitive data (i.e. path creation / permission setting races). > + */ > + File parentDirectory = baseDirectory.getParentFile(); > + > + Preconditions.checkState(parentDirectory.exists(), "WAL parent > directory (" > + + parentDirectory + ") does not exist"); > + Preconditions.checkState(parentDirectory.isDirectory(), "WAL parent (" > + + parentDirectory + " ) is not a directory"); > + > + if (!baseDirectory.exists()) { > + if (!baseDirectory.mkdir()) { > + throw new IOException("Unable to create WAL base directory " > + + baseDirectory); > + } > + } else { > + Preconditions.checkState(baseDirectory.isDirectory(), > + "WAL base directory " + baseDirectory > + + " exists but it isn't a directory"); > + } > + > + openDirectory.mkdir(); > + Preconditions.checkState(openDirectory.exists(), > + "Directory doesn't exist: %s", openDirectory); > + > + pendingDirectory.mkdir(); > + Preconditions.checkState(pendingDirectory.exists(), > + "Directory doesn't exist: %s", pendingDirectory); > + > + sentDirectory.mkdir(); > + Preconditions.checkState(sentDirectory.exists(), > + "Directory doesn't exist: %s", sentDirectory); > + > + completeDirectory.mkdir(); > + Preconditions.checkState(completeDirectory.exists(), > + "Directory doesn't exist: %s", completeDirectory); > + } > + > + public FileBasedWALWriter getWriter() throws IOException { > + FileBasedWALWriter writer = new FileBasedWALWriter(); > + > + writer.setFormatter(formatter); > + > + if (!isInitialized) { > + initialize(); > + } > + > + return writer; > + } > + > + @Override > + public String toString() { > + return "{ baseDirectory:" + baseDirectory + " openDirectory:" > + + openDirectory + " pendingDirectory:" + pendingDirectory > + + " sentDirectory:" + sentDirectory + " completeDirectory:" > + + completeDirectory + " isInitialized:" + isInitialized + " }"; > + } > + > + public File getBaseDirectory() { > + return baseDirectory; > + } > + > + public File getOpenDirectory() { > + return openDirectory; > + } > + > + public File getPendingDirectory() { > + return pendingDirectory; > + } > + > + public File getSentDirectory() { > + return sentDirectory; > + } > + > + public File getCompleteDirectory() { > + return completeDirectory; > + } > + > +} > > Added: > incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java > URL: > http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java?rev=1160466&view=auto > > ============================================================================== > --- > incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java > (added) > +++ > incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java > Mon Aug 22 22:04:00 2011 > @@ -0,0 +1,76 @@ > +package org.apache.flume.durability; > + > +import java.io.BufferedOutputStream; > +import java.io.File; > +import java.io.FileNotFoundException; > +import java.io.FileOutputStream; > +import java.io.IOException; > + > +import org.apache.flume.Event; > +import org.apache.flume.formatter.output.EventFormatter; > + > +public class FileBasedWALManager { > + > + private File directory; > + > + public FileBasedWAL getWAL(String name) { > + File walDirectory = new File(directory, name); > + FileBasedWAL wal = new FileBasedWAL(walDirectory); > + > + return wal; > + } > + > + @Override > + public String toString() { > + return "{ directory:" + directory + " }"; > + } > + > + public File getDirectory() { > + return directory; > + } > + > + public void setDirectory(File directory) { > + this.directory = directory; > + } > + > + public static class FileBasedWALWriter { > + > + private File file; > + private BufferedOutputStream outputStream; > + private EventFormatter formatter; > + > + public void open() throws FileNotFoundException { > + outputStream = new BufferedOutputStream(new FileOutputStream(file)); > + } > + > + public void write(Event event) throws IOException { > + outputStream.write(formatter.format(event)); > + } > + > + public void close() throws IOException { > + outputStream.close(); > + } > + > + public void flush() throws IOException { > + outputStream.flush(); > + } > + > + public File getFile() { > + return file; > + } > + > + public void setFile(File file) { > + this.file = file; > + } > + > + public EventFormatter getFormatter() { > + return formatter; > + } > + > + public void setFormatter(EventFormatter formatter) { > + this.formatter = formatter; > + } > + > + } > + > +} > > > -- // Jonathan Hsieh (shay) // Software Engineer, Cloudera // [email protected]
