Functionally complete. Not well tested. Have some UTs
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/60e7a812 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/60e7a812 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/60e7a812 Branch: refs/heads/1.x-branch Commit: 60e7a8126aceb85fe194d1cf90818fcda696d60a Parents: 6fcebe6 Author: Roshan Naik <[email protected]> Authored: Wed Dec 9 13:10:32 2015 -0800 Committer: Roshan Naik <[email protected]> Committed: Thu Jan 14 11:34:55 2016 -0800 ---------------------------------------------------------------------- .../hdfs/common/CmpFilesByModificationTime.java | 14 + .../org/apache/storm/hdfs/common/HdfsUtils.java | 57 ++ .../storm/hdfs/spout/AbstractFileReader.java | 71 ++ .../org/apache/storm/hdfs/spout/Configs.java | 44 ++ .../org/apache/storm/hdfs/spout/DirLock.java | 74 +++ .../org/apache/storm/hdfs/spout/FileLock.java | 263 ++++++++ .../org/apache/storm/hdfs/spout/FileOffset.java | 36 ++ .../org/apache/storm/hdfs/spout/FileReader.java | 49 ++ .../org/apache/storm/hdfs/spout/HdfsSpout.java | 645 +++++++++++++++++++ .../apache/storm/hdfs/spout/ParseException.java | 26 + .../storm/hdfs/spout/ProgressTracker.java | 67 ++ .../storm/hdfs/spout/SequenceFileReader.java | 227 +++++++ .../apache/storm/hdfs/spout/TextFileReader.java | 168 +++++ .../apache/storm/hdfs/spout/TestDirLock.java | 143 ++++ .../apache/storm/hdfs/spout/TestHdfsSpout.java | 465 +++++++++++++ .../storm/hdfs/spout/TestProgressTracker.java | 108 ++++ 16 files changed, 2457 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java new file mode 100644 index 0000000..d194558 --- /dev/null +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java @@ -0,0 +1,14 @@ +package org.apache.storm.hdfs.common; + +import org.apache.hadoop.fs.LocatedFileStatus; + +import java.util.Comparator; + + +public class CmpFilesByModificationTime + implements Comparator<LocatedFileStatus> { + @Override + public int compare(LocatedFileStatus o1, LocatedFileStatus o2) { + return new Long(o1.getModificationTime()).compareTo( o1.getModificationTime() ); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java new file mode 100644 index 0000000..344adf1 --- /dev/null +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java @@ -0,0 +1,57 @@ +package org.apache.storm.hdfs.common; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +public class HdfsUtils { + /** list files sorted by modification time that have not been modified since 'olderThan'. if + * 'olderThan' is <= 0 then the filtering is disabled */ + public static Collection<Path> listFilesByModificationTime(FileSystem fs, Path directory, long olderThan) + throws IOException { + ArrayList<LocatedFileStatus> fstats = new ArrayList<>(); + + RemoteIterator<LocatedFileStatus> itr = fs.listFiles(directory, false); + while( itr.hasNext() ) { + LocatedFileStatus fileStatus = itr.next(); + if(olderThan>0 && fileStatus.getModificationTime()<olderThan ) + fstats.add(fileStatus); + else + fstats.add(fileStatus); + } + Collections.sort(fstats, new CmpFilesByModificationTime() ); + + ArrayList<Path> result = new ArrayList<>(fstats.size()); + for (LocatedFileStatus fstat : fstats) { + result.add(fstat.getPath()); + } + return result; + } + + public static class Pair<K,V> { + private K key; + private V value; + public Pair(K key, V value) { + this.key = key; + this.value = value; + } + + public K getKey() { + return key; + } + + public V getValue() { + return value; + } + + public static <K,V> Pair of(K key, V value) { + return new Pair(key,value); + } + } // class Pair +} http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java new file mode 100644 index 0000000..09dc0d3 --- /dev/null +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.hdfs.spout; + +import backtype.storm.tuple.Fields; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + + +abstract class AbstractFileReader implements FileReader { + + private final Path file; + private final FileSystem fs; + private Fields fields; + + public AbstractFileReader(FileSystem fs, Path file, Fields fieldNames) { + if (fs == null || file == null) + throw new IllegalArgumentException("file and filesystem args cannot be null"); + this.fs = fs; + this.file = file; + this.fields = fieldNames; + } + + @Override + public Path getFilePath() { + return file; + } + + + @Override + public Fields getOutputFields() { + return fields; + } + + @Override + public void setFields(String... fieldNames) { + this.fields = new Fields(fieldNames); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + AbstractFileReader that = (AbstractFileReader) o; + + return !(file != null ? !file.equals(that.file) : that.file != null); + } + + @Override + public int hashCode() { + return file != null ? file.hashCode() : 0; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java new file mode 100644 index 0000000..66b8972 --- /dev/null +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.hdfs.spout; + +public class Configs { + public static final String READER_TYPE = "hdfsspout.reader.type"; + public static final String TEXT = "text"; + public static final String SEQ = "seq"; + + public static final String SOURCE_DIR = "hdfsspout.source.dir"; // dir from which to read files + public static final String ARCHIVE_DIR = "hdfsspout.archive.dir"; // completed files will be moved here + public static final String BAD_DIR = "hdfsspout.badfiles.dir"; // unpraseable files will be moved here + public static final String LOCK_DIR = "hdfsspout.lock.dir"; // dir in which lock files will be created + public static final String COMMIT_FREQ_COUNT = "hdfsspout.commit.count"; // commit after N records + public static final String COMMIT_FREQ_SEC = "hdfsspout.commit.sec"; // commit after N secs + public static final String MAX_DUPLICATE = "hdfsspout.max.duplicate"; + public static final String LOCK_TIMEOUT = "hdfsspout.lock.timeout.sec"; // inactivity duration after which locks are considered candidates for being reassigned to another spout + public static final String CLOCKS_INSYNC = "hdfsspout.clocks.insync"; // if clocks on machines in the Storm cluster are in sync + + public static final String DEFAULT_LOCK_DIR = ".lock"; + public static final int DEFAULT_COMMIT_FREQ_COUNT = 10000; + public static final int DEFAULT_COMMIT_FREQ_SEC = 10; + public static final int DEFAULT_MAX_DUPLICATES = 100; + public static final int DEFAULT_LOCK_TIMEOUT = 5 * 60; // 5 min + public static final String DEFAULT_HDFS_CONFIG_KEY = "hdfs.config"; + + +} // class Configs http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java new file mode 100644 index 0000000..ef02a8f --- /dev/null +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.hdfs.spout; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class DirLock { + private FileSystem fs; + private final Path lockFile; + public static final String DIR_LOCK_FILE = "DIRLOCK"; + private static final Logger log = LoggerFactory.getLogger(DirLock.class); + private DirLock(FileSystem fs, Path lockFile) throws IOException { + if( fs.isDirectory(lockFile) ) + throw new IllegalArgumentException(lockFile.toString() + " is not a directory"); + this.fs = fs; + this.lockFile = lockFile; + } + + /** Returns null if somebody else has a lock + * + * @param fs + * @param dir the dir on which to get a lock + * @return lock object + * @throws IOException if there were errors + */ + public static DirLock tryLock(FileSystem fs, Path dir) throws IOException { + Path lockFile = new Path(dir.toString() + Path.SEPARATOR_CHAR + DIR_LOCK_FILE ); + try { + FSDataOutputStream os = fs.create(lockFile, false); + if(log.isInfoEnabled()) { + log.info("Thread acquired dir lock " + threadInfo() + " - lockfile " + lockFile); + } + os.close(); + return new DirLock(fs, lockFile); + } catch (FileAlreadyExistsException e) { + return null; + } + } + + private static String threadInfo () { + return "ThdId=" + Thread.currentThread().getId() + ", ThdName=" + Thread.currentThread().getName(); + } + public void release() throws IOException { + fs.delete(lockFile, false); + log.info("Thread {} released dir lock {} ", threadInfo(), lockFile); + } + + public Path getLockFile() { + return lockFile; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java new file mode 100644 index 0000000..f4a6813 --- /dev/null +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java @@ -0,0 +1,263 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.hdfs.spout; + + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.storm.hdfs.common.HdfsUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Collection; + +public class FileLock { + + private final FileSystem fs; + private final String componentID; + private final Path lockFile; + private final FSDataOutputStream stream; + private LogEntry lastEntry; + + private static final Logger log = LoggerFactory.getLogger(DirLock.class); + + private FileLock(FileSystem fs, Path fileToLock, Path lockDirPath, String spoutId) + throws IOException { + this.fs = fs; + String lockFileName = lockDirPath.toString() + Path.SEPARATOR_CHAR + fileToLock.getName(); + this.lockFile = new Path(lockFileName); + this.stream = fs.create(lockFile); + this.componentID = spoutId; + logProgress("0", false); + } + + private FileLock(FileSystem fs, Path lockFile, String spoutId, LogEntry entry) + throws IOException { + this.fs = fs; + this.lockFile = lockFile; + this.stream = fs.append(lockFile); + this.componentID = spoutId; + log.debug("Acquired abandoned lockFile {}", lockFile); + logProgress(entry.fileOffset, true); + } + + public void heartbeat(String fileOffset) throws IOException { + logProgress(fileOffset, true); + } + + // new line is at beginning of each line (instead of end) for better recovery from + // partial writes of prior lines + private void logProgress(String fileOffset, boolean prefixNewLine) + throws IOException { + long now = System.currentTimeMillis(); + LogEntry entry = new LogEntry(now, componentID, fileOffset); + String line = entry.toString(); + if(prefixNewLine) + stream.writeBytes(System.lineSeparator() + line); + else + stream.writeBytes(line); + stream.flush(); + lastEntry = entry; // update this only after writing to hdfs + } + + public void release() throws IOException { + stream.close(); + fs.delete(lockFile, false); + } + + // throws exception immediately if not able to acquire lock + public static FileLock tryLock(FileSystem hdfs, Path fileToLock, Path lockDirPath, String spoutId) + throws IOException { + return new FileLock(hdfs, fileToLock, lockDirPath, spoutId); + } + + /** + * checks if lockFile is older than 'olderThan' UTC time by examining the modification time + * on file and (if necessary) the timestamp in last log entry in the file. If its stale, then + * returns the last log entry, else returns null. + * @param fs + * @param lockFile + * @param olderThan time (millis) in UTC. + * @return the last entry in the file if its too old. null if last entry is not too old + * @throws IOException + */ + public static LogEntry getLastEntryIfStale(FileSystem fs, Path lockFile, long olderThan) + throws IOException { + if( fs.getFileStatus(lockFile).getModificationTime() >= olderThan ) { + // HDFS timestamp may not reflect recent updates, so we double check the + // timestamp in last line of file to see when the last update was made + LogEntry lastEntry = getLastEntry(fs, lockFile); + if(lastEntry==null) { + throw new RuntimeException(lockFile.getName() + " is empty. this file is invalid."); + } + if( lastEntry.eventTime <= olderThan ) + return lastEntry; + } + return null; + } + + /** + * returns the last log entry + * @param fs + * @param lockFile + * @return + * @throws IOException + */ + public static LogEntry getLastEntry(FileSystem fs, Path lockFile) + throws IOException { + FSDataInputStream in = fs.open(lockFile); + BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + String lastLine = null; + for(String line = reader.readLine(); line!=null; line = reader.readLine() ) { + lastLine=line; + } + return LogEntry.deserialize(lastLine); + } + + // takes ownership of the lock file + + /** + * Takes ownership of the lock file. + * @param lockFile + * @param lastEntry last entry in the lock file. this param is an optimization. + * we dont scan the lock file again to find its last entry here since + * its already been done once by the logic used to check if the lock + * file is stale. so this value comes from that earlier scan. + * @param spoutId spout id + * @return + */ + public static FileLock takeOwnership(FileSystem fs, Path lockFile, LogEntry lastEntry, String spoutId) + throws IOException { + return new FileLock(fs, lockFile, spoutId, lastEntry); + } + + /** + * Finds a oldest expired lock file (using modification timestamp), then takes + * ownership of the lock file + * Impt: Assumes access to lockFilesDir has been externally synchronized such that + * only one thread accessing the same thread + * @param fs + * @param lockFilesDir + * @param locktimeoutSec + * @return + */ + public static FileLock acquireOldestExpiredLock(FileSystem fs, Path lockFilesDir, int locktimeoutSec, String spoutId) + throws IOException { + // list files + long olderThan = System.currentTimeMillis() - (locktimeoutSec*1000); + Collection<Path> listing = HdfsUtils.listFilesByModificationTime(fs, lockFilesDir, olderThan); + + // locate oldest expired lock file (if any) and take ownership + for (Path file : listing) { + if(file.getName().equalsIgnoreCase( DirLock.DIR_LOCK_FILE) ) + continue; + LogEntry lastEntry = getLastEntryIfStale(fs, file, olderThan); + if(lastEntry!=null) + return FileLock.takeOwnership(fs, file, lastEntry, spoutId); + } + log.info("No abandoned files found"); + return null; + } + + + /** + * Finds oldest expired lock file (using modification timestamp), then takes + * ownership of the lock file + * Impt: Assumes access to lockFilesDir has been externally synchronized such that + * only one thread accessing the same thread + * @param fs + * @param lockFilesDir + * @param locktimeoutSec + * @param spoutId + * @return a Pair<lock file path, last entry in lock file> .. if expired lock file found + * @throws IOException + */ + public static HdfsUtils.Pair<Path,LogEntry> locateOldestExpiredLock(FileSystem fs, Path lockFilesDir, int locktimeoutSec, String spoutId) + throws IOException { + // list files + long olderThan = System.currentTimeMillis() - (locktimeoutSec*1000); + Collection<Path> listing = HdfsUtils.listFilesByModificationTime(fs, lockFilesDir, olderThan); + + // locate oldest expired lock file (if any) and take ownership + for (Path file : listing) { + if(file.getName().equalsIgnoreCase( DirLock.DIR_LOCK_FILE) ) + continue; + LogEntry lastEntry = getLastEntryIfStale(fs, file, olderThan); + if(lastEntry!=null) + return new HdfsUtils.Pair<>(file, lastEntry); + } + log.info("No abandoned files found"); + return null; + } + + public LogEntry getLastLogEntry() { + return lastEntry; + } + + public Path getLockFile() { + return lockFile; + } + + public static class LogEntry { + private static final int NUM_FIELDS = 3; + public final long eventTime; + public final String componentID; + public final String fileOffset; + + public LogEntry(long eventtime, String componentID, String fileOffset) { + this.eventTime = eventtime; + this.componentID = componentID; + this.fileOffset = fileOffset; + } + + public String toString() { + return eventTime + "," + componentID + "," + fileOffset; + } + public static LogEntry deserialize(String line) { + String[] fields = line.split(",", NUM_FIELDS); + return new LogEntry(Long.parseLong(fields[0]), fields[1], fields[2]); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof LogEntry)) return false; + + LogEntry logEntry = (LogEntry) o; + + if (eventTime != logEntry.eventTime) return false; + if (!componentID.equals(logEntry.componentID)) return false; + return fileOffset.equals(logEntry.fileOffset); + + } + + @Override + public int hashCode() { + int result = (int) (eventTime ^ (eventTime >>> 32)); + result = 31 * result + componentID.hashCode(); + result = 31 * result + fileOffset.hashCode(); + return result; + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java new file mode 100644 index 0000000..ea8c1e1 --- /dev/null +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.hdfs.spout; + +/** + * Represents the notion of an offset in a file. Idea is accommodate representing file + * offsets other than simple byte offset as it may be insufficient for certain formats. + * Reader for each format implements this as appropriate for its needs. + * Note: Derived types must : + * - implement equals() & hashCode() appropriately. + * - implement Comparable<> appropriately. + * - implement toString() appropriately for serialization. + * - constructor(string) for deserialization + */ + +interface FileOffset extends Comparable<FileOffset>, Cloneable { + /** tests if rhs == currOffset+1 */ + boolean isNextOffset(FileOffset rhs); + public FileOffset clone(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java new file mode 100644 index 0000000..78284cf --- /dev/null +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.hdfs.spout; + +import backtype.storm.tuple.Fields; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.List; + +interface FileReader { + public Path getFilePath(); + + /** + * A simple numeric value may not be sufficient for certain formats consequently + * this is a String. + */ + public FileOffset getFileOffset(); + + /** + * Get the next tuple from the file + * + * @return null if no more data + * @throws IOException + */ + public List<Object> next() throws IOException, ParseException; + + public Fields getOutputFields(); + + public void setFields(String... fieldNames); + + public void close(); +} http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java new file mode 100644 index 0000000..2d4afdb --- /dev/null +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java @@ -0,0 +1,645 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.hdfs.spout; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import backtype.storm.Config; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.storm.hdfs.common.HdfsUtils; +import org.apache.storm.hdfs.common.security.HdfsSecurityUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichSpout; +import backtype.storm.tuple.Fields; + +public class HdfsSpout extends BaseRichSpout { + + private static final Logger LOG = LoggerFactory.getLogger(HdfsSpout.class); + + private Path sourceDirPath; + private Path archiveDirPath; + private Path badFilesDirPath; + private Path lockDirPath; + + private int commitFrequencyCount = Configs.DEFAULT_COMMIT_FREQ_COUNT; + private int commitFrequencySec = Configs.DEFAULT_COMMIT_FREQ_SEC; + private int maxDuplicates = Configs.DEFAULT_MAX_DUPLICATES; + private int lockTimeoutSec = Configs.DEFAULT_LOCK_TIMEOUT; + private boolean clocksInSync = true; + + private ProgressTracker tracker = new ProgressTracker(); + + private FileSystem hdfs; + private FileReader reader; + + private SpoutOutputCollector collector; + HashMap<MessageId, List<Object> > inflight = new HashMap<>(); + LinkedBlockingQueue<HdfsUtils.Pair<MessageId, List<Object>>> retryList = new LinkedBlockingQueue<>(); + + private String inprogress_suffix = ".inprogress"; + + private Configuration hdfsConfig; + private String readerType; + + private Map conf = null; + private FileLock lock; + private String spoutId = null; + + HdfsUtils.Pair<Path,FileLock.LogEntry> lastExpiredLock = null; + private long lastExpiredLockTime = 0; + + private long tupleCounter = 0; + private boolean ackEnabled = false; + private int acksSinceLastCommit = 0 ; + private final AtomicBoolean commitTimeElapsed = new AtomicBoolean(false); + private final Timer commitTimer = new Timer(); + private boolean fileReadCompletely = false; + + private String configKey = Configs.DEFAULT_HDFS_CONFIG_KEY; // key for hdfs kerberos configs + + public HdfsSpout() { + } + + public Path getLockDirPath() { + return lockDirPath; + } + + public SpoutOutputCollector getCollector() { + return collector; + } + + public HdfsSpout withConfigKey(String configKey){ + this.configKey = configKey; + return this; + } + + public void nextTuple() { + LOG.debug("Next Tuple"); + // 1) First re-emit any previously failed tuples (from retryList) + if (!retryList.isEmpty()) { + LOG.debug("Sending from retry list"); + HdfsUtils.Pair<MessageId, List<Object>> pair = retryList.remove(); + emitData(pair.getValue(), pair.getKey()); + return; + } + + if( ackEnabled && tracker.size()>=maxDuplicates ) { + LOG.warn("Waiting for more ACKs before generating new tuples. " + + "Progress tracker size has reached limit {}" + , maxDuplicates); + // Don't emit anything .. allow configured spout wait strategy to kick in + return; + } + + // 2) If no failed tuples, then send tuples from hdfs + while (true) { + try { + // 3) Select a new file if one is not open already + if (reader == null) { + reader = pickNextFile(); + if (reader == null) { + LOG.info("Currently no new files to process under : " + sourceDirPath); + return; + } + } + + // 4) Read record from file, emit to collector and record progress + List<Object> tuple = reader.next(); + if (tuple != null) { + fileReadCompletely= false; + ++tupleCounter; + MessageId msgId = new MessageId(tupleCounter, reader.getFilePath(), reader.getFileOffset()); + emitData(tuple, msgId); + + if(!ackEnabled) { + ++acksSinceLastCommit; // assume message is immediately acked in non-ack mode + commitProgress(reader.getFileOffset()); + } else { + commitProgress(tracker.getCommitPosition()); + } + return; + } else { + fileReadCompletely = true; + if(!ackEnabled) { + markFileAsDone(reader.getFilePath()); + } + } + } catch (IOException e) { + LOG.error("I/O Error processing at file location " + getFileProgress(reader), e); + // don't emit anything .. allow configured spout wait strategy to kick in + return; + } catch (ParseException e) { + LOG.error("Parsing error when processing at file location " + getFileProgress(reader) + + ". Skipping remainder of file.", e); + markFileAsBad(reader.getFilePath()); + // note: Unfortunately not emitting anything here due to parse error + // will trigger the configured spout wait strategy which is unnecessary + } + } + + } + + // will commit progress into lock file if commit threshold is reached + private void commitProgress(FileOffset position) { + if ( lock!=null && canCommitNow() ) { + try { + lock.heartbeat(position.toString()); + acksSinceLastCommit = 0; + commitTimeElapsed.set(false); + setupCommitElapseTimer(); + } catch (IOException e) { + LOG.error("Unable to commit progress Will retry later.", e); + } + } + } + + private void setupCommitElapseTimer() { + if(commitFrequencySec<=0) + return; + TimerTask timerTask = new TimerTask() { + @Override + public void run() { + commitTimeElapsed.set(false); + } + }; + commitTimer.schedule(timerTask, commitFrequencySec * 1000); + } + + + private static String getFileProgress(FileReader reader) { + return reader.getFilePath() + " " + reader.getFileOffset(); + } + + private void markFileAsDone(Path filePath) { + fileReadCompletely = false; + try { + renameCompletedFile(reader.getFilePath()); + } catch (IOException e) { + LOG.error("Unable to archive completed file" + filePath, e); + } + unlockAndCloseReader(); + + } + + private void markFileAsBad(Path file) { + String fileName = file.toString(); + String fileNameMinusSuffix = fileName.substring(0, fileName.indexOf(inprogress_suffix)); + String originalName = new Path(fileNameMinusSuffix).getName(); + Path newFile = new Path( badFilesDirPath + Path.SEPARATOR + originalName); + + LOG.info("Moving bad file to " + newFile); + try { + if (!hdfs.rename(file, newFile) ) { // seems this can fail by returning false or throwing exception + throw new IOException("Move failed for bad file: " + file); // convert false ret value to exception + } + } catch (IOException e) { + LOG.warn("Error moving bad file: " + file + ". to destination : " + newFile); + } + + unlockAndCloseReader(); + } + + private void unlockAndCloseReader() { + reader.close(); + reader = null; + try { + lock.release(); + } catch (IOException e) { + LOG.error("Unable to delete lock file : " + this.lock.getLockFile(), e); + } + lock = null; + } + + + + protected void emitData(List<Object> tuple, MessageId id) { + LOG.debug("Emitting - {}", id); + this.collector.emit(tuple, id); + inflight.put(id, tuple); + } + + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + this.conf = conf; + final String FILE_SYSTEM = "filesystem"; + LOG.info("Opening"); + this.collector = collector; + this.hdfsConfig = new Configuration(); + this.tupleCounter = 0; + + for( Object k : conf.keySet() ) { + String key = k.toString(); + if( ! FILE_SYSTEM.equalsIgnoreCase( key ) ) { // to support unit test only + String val = conf.get(key).toString(); + LOG.info("Config setting : " + key + " = " + val); + this.hdfsConfig.set(key, val); + } + else + this.hdfs = (FileSystem) conf.get(key); + + if(key.equalsIgnoreCase(Configs.READER_TYPE)) { + readerType = conf.get(key).toString(); + checkValidReader(readerType); + } + } + + // - Hdfs configs + this.hdfsConfig = new Configuration(); + Map<String, Object> map = (Map<String, Object>)conf.get(this.configKey); + if(map != null){ + for(String key : map.keySet()){ + this.hdfsConfig.set(key, String.valueOf(map.get(key))); + } + } + + try { + HdfsSecurityUtil.login(conf, hdfsConfig); + } catch (IOException e) { + LOG.error("Failed to open " + sourceDirPath); + throw new RuntimeException(e); + } + + // -- source dir config + if ( !conf.containsKey(Configs.SOURCE_DIR) ) { + LOG.error(Configs.SOURCE_DIR + " setting is required"); + throw new RuntimeException(Configs.SOURCE_DIR + " setting is required"); + } + this.sourceDirPath = new Path( conf.get(Configs.SOURCE_DIR).toString() ); + + // -- archive dir config + if ( !conf.containsKey(Configs.ARCHIVE_DIR) ) { + LOG.error(Configs.ARCHIVE_DIR + " setting is required"); + throw new RuntimeException(Configs.ARCHIVE_DIR + " setting is required"); + } + this.archiveDirPath = new Path( conf.get(Configs.ARCHIVE_DIR).toString() ); + + try { + if(hdfs.exists(archiveDirPath)) { + if(! hdfs.isDirectory(archiveDirPath) ) { + LOG.error("Archive directory is a file. " + archiveDirPath); + throw new RuntimeException("Archive directory is a file. " + archiveDirPath); + } + } else if(! hdfs.mkdirs(archiveDirPath) ) { + LOG.error("Unable to create archive directory. " + archiveDirPath); + throw new RuntimeException("Unable to create archive directory " + archiveDirPath); + } + } catch (IOException e) { + LOG.error("Unable to create archive dir ", e); + throw new RuntimeException("Unable to create archive directory ", e); + } + + // -- bad files dir config + if ( !conf.containsKey(Configs.BAD_DIR) ) { + LOG.error(Configs.BAD_DIR + " setting is required"); + throw new RuntimeException(Configs.BAD_DIR + " setting is required"); + } + + this.badFilesDirPath = new Path(conf.get(Configs.BAD_DIR).toString()); + + try { + if(hdfs.exists(badFilesDirPath)) { + if(! hdfs.isDirectory(badFilesDirPath) ) { + LOG.error("Bad files directory is a file: " + badFilesDirPath); + throw new RuntimeException("Bad files directory is a file: " + badFilesDirPath); + } + } else if(! hdfs.mkdirs(badFilesDirPath) ) { + LOG.error("Unable to create directory for bad files: " + badFilesDirPath); + throw new RuntimeException("Unable to create a directory for bad files: " + badFilesDirPath); + } + } catch (IOException e) { + LOG.error("Unable to create archive dir ", e); + throw new RuntimeException(e.getMessage(), e); + } + + // -- lock dir config + String lockDir = !conf.containsKey(Configs.LOCK_DIR) ? getDefaultLockDir(sourceDirPath) : conf.get(Configs.LOCK_DIR).toString() ; + this.lockDirPath = new Path(lockDir); + + try { + if(hdfs.exists(lockDirPath)) { + if(! hdfs.isDirectory(lockDirPath) ) { + LOG.error("Lock directory is a file: " + lockDirPath); + throw new RuntimeException("Lock directory is a file: " + lockDirPath); + } + } else if(! hdfs.mkdirs(lockDirPath) ) { + LOG.error("Unable to create lock directory: " + lockDirPath); + throw new RuntimeException("Unable to create lock directory: " + lockDirPath); + } + } catch (IOException e) { + LOG.error("Unable to create lock dir: " + lockDirPath, e); + throw new RuntimeException(e.getMessage(), e); + } + + // -- lock timeout + if( conf.get(Configs.LOCK_TIMEOUT) !=null ) + this.lockTimeoutSec = Integer.parseInt(conf.get(Configs.LOCK_TIMEOUT).toString()); + + // -- enable/disable ACKing + Object ackers = conf.get(Config.TOPOLOGY_ACKER_EXECUTORS); + if( ackers!=null ) + this.ackEnabled = ( Integer.parseInt( ackers.toString() ) > 0 ); + else + this.ackEnabled = false; + + // -- commit frequency - count + if( conf.get(Configs.COMMIT_FREQ_COUNT) != null ) + commitFrequencyCount = Integer.parseInt( conf.get(Configs.COMMIT_FREQ_COUNT).toString() ); + + // -- commit frequency - seconds + if( conf.get(Configs.COMMIT_FREQ_SEC) != null ) + commitFrequencySec = Integer.parseInt( conf.get(Configs.COMMIT_FREQ_SEC).toString() ); + + // -- max duplicate + if( conf.get(Configs.MAX_DUPLICATE) !=null ) + maxDuplicates = Integer.parseInt( conf.get(Configs.MAX_DUPLICATE).toString() ); + + // -- clocks in sync + if( conf.get(Configs.CLOCKS_INSYNC) !=null ) + clocksInSync = Boolean.parseBoolean(conf.get(Configs.CLOCKS_INSYNC).toString()); + + // -- spout id + spoutId = context.getThisComponentId(); + + // setup timer for commit elapse time tracking + setupCommitElapseTimer(); + } + + private String getDefaultLockDir(Path sourceDirPath) { + return sourceDirPath.toString() + Path.SEPARATOR + Configs.DEFAULT_LOCK_DIR; + } + + private static void checkValidReader(String readerType) { + if(readerType.equalsIgnoreCase(Configs.TEXT) || readerType.equalsIgnoreCase(Configs.SEQ) ) + return; + try { + Class<?> classType = Class.forName(readerType); + classType.getConstructor(FileSystem.class, Path.class, Map.class); + return; + } catch (ClassNotFoundException e) { + LOG.error(readerType + " not found in classpath.", e); + throw new IllegalArgumentException(readerType + " not found in classpath.", e); + } catch (NoSuchMethodException e) { + LOG.error(readerType + " is missing the expected constructor for Readers.", e); + throw new IllegalArgumentException(readerType + " is missing the expected constuctor for Readers."); + } + } + + @Override + public void ack(Object msgId) { + MessageId id = (MessageId) msgId; + inflight.remove(id); + ++acksSinceLastCommit; + tracker.recordAckedOffset(id.offset); + commitProgress(tracker.getCommitPosition()); + if(fileReadCompletely) { + markFileAsDone(reader.getFilePath()); + reader = null; + } + super.ack(msgId); + } + + private boolean canCommitNow() { + if( acksSinceLastCommit >= commitFrequencyCount ) + return true; + return commitTimeElapsed.get(); + } + + @Override + public void fail(Object msgId) { + super.fail(msgId); + HdfsUtils.Pair<MessageId, List<Object>> item = HdfsUtils.Pair.of(msgId, inflight.remove(msgId)); + retryList.add(item); + } + + private FileReader pickNextFile() { + try { + // 1) If there are any abandoned files, pick oldest one + lock = getOldestExpiredLock(); + if (lock != null) { + Path file = getFileForLockFile(lock.getLockFile(), sourceDirPath); + String resumeFromOffset = lock.getLastLogEntry().fileOffset; + LOG.info("Processing abandoned file : {}", file); + return createFileReader(file, resumeFromOffset); + } + + // 2) If no abandoned files, then pick oldest file in sourceDirPath, lock it and rename it + Collection<Path> listing = HdfsUtils.listFilesByModificationTime(hdfs, sourceDirPath, 0); + + for (Path file : listing) { + if( file.getName().contains(inprogress_suffix) ) + continue; + LOG.info("Processing : {} ", file); + lock = FileLock.tryLock(hdfs, file, lockDirPath, spoutId); + if( lock==null ) { + LOG.info("Unable to get lock, so skipping file: {}", file); + continue; // could not lock, so try another file. + } + Path newFile = renameSelectedFile(file); + return createFileReader(newFile); + } + + return null; + } catch (IOException e) { + LOG.error("Unable to select next file for consumption " + sourceDirPath, e); + return null; + } + } + + /** + * If clocks in sync, then acquires the oldest expired lock + * Else, on first call, just remembers the oldest expired lock, on next call check if the lock is updated. if not updated then acquires the lock + * @return + * @throws IOException + */ + private FileLock getOldestExpiredLock() throws IOException { + // 1 - acquire lock on dir + DirLock dirlock = DirLock.tryLock(hdfs, lockDirPath); + if (dirlock == null) + return null; + try { + // 2 - if clocks are in sync then simply take ownership of the oldest expired lock + if (clocksInSync) + return FileLock.acquireOldestExpiredLock(hdfs, lockDirPath, lockTimeoutSec, spoutId); + + // 3 - if clocks are not in sync .. + if( lastExpiredLock == null ) { + // just make a note of the oldest expired lock now and check if its still unmodified after lockTimeoutSec + lastExpiredLock = FileLock.locateOldestExpiredLock(hdfs, lockDirPath, lockTimeoutSec, spoutId); + lastExpiredLockTime = System.currentTimeMillis(); + return null; + } + // see if lockTimeoutSec time has elapsed since we last selected the lock file + if( hasExpired(lastExpiredLockTime) ) + return null; + + // If lock file has expired, then own it + FileLock.LogEntry lastEntry = FileLock.getLastEntry(hdfs, lastExpiredLock.getKey()); + if( lastEntry.equals(lastExpiredLock.getValue()) ) { + FileLock result = FileLock.takeOwnership(hdfs, lastExpiredLock.getKey(), lastEntry, spoutId); + lastExpiredLock = null; + return result; + } else { + // if lock file has been updated since last time, then leave this lock file alone + lastExpiredLock = null; + return null; + } + } finally { + dirlock.release(); + } + } + + private boolean hasExpired(long lastModifyTime) { + return (System.currentTimeMillis() - lastModifyTime ) < lockTimeoutSec*1000; + } + + /** + * Creates a reader that reads from beginning of file + * @param file file to read + * @return + * @throws IOException + */ + private FileReader createFileReader(Path file) + throws IOException { + if(readerType.equalsIgnoreCase(Configs.SEQ)) + return new SequenceFileReader(this.hdfs, file, conf); + if(readerType.equalsIgnoreCase(Configs.TEXT)) + return new TextFileReader(this.hdfs, file, conf); + + try { + Class<?> clsType = Class.forName(readerType); + Constructor<?> constructor = clsType.getConstructor(FileSystem.class, Path.class, Map.class); + return (FileReader) constructor.newInstance(this.hdfs, file, conf); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException("Unable to instantiate " + readerType, e); + } + } + + + /** + * Creates a reader that starts reading from 'offset' + * @param file the file to read + * @param offset the offset string should be understandable by the reader type being used + * @return + * @throws IOException + */ + private FileReader createFileReader(Path file, String offset) + throws IOException { + if(readerType.equalsIgnoreCase(Configs.SEQ)) + return new SequenceFileReader(this.hdfs, file, conf, offset); + if(readerType.equalsIgnoreCase(Configs.TEXT)) + return new TextFileReader(this.hdfs, file, conf, offset); + + try { + Class<?> clsType = Class.forName(readerType); + Constructor<?> constructor = clsType.getConstructor(FileSystem.class, Path.class, Map.class, String.class); + return (FileReader) constructor.newInstance(this.hdfs, file, conf, offset); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException("Unable to instantiate " + readerType, e); + } + } + + // returns new path of renamed file + private Path renameSelectedFile(Path file) + throws IOException { + Path newFile = new Path( file.toString() + inprogress_suffix ); + if( ! hdfs.rename(file, newFile) ) { + throw new IOException("Rename failed for file: " + file); + } + return newFile; + } + + /** Returns the corresponding input file in the 'sourceDirPath' for the specified lock file. + * If no such file is found then returns null + */ + private Path getFileForLockFile(Path lockFile, Path sourceDirPath) + throws IOException { + String lockFileName = lockFile.getName(); + Path dataFile = new Path(sourceDirPath + lockFileName + inprogress_suffix); + if( hdfs.exists(dataFile) ) + return dataFile; + dataFile = new Path(sourceDirPath + lockFileName); + if(hdfs.exists(dataFile)) + return dataFile; + return null; + } + + + private Path renameCompletedFile(Path file) throws IOException { + String fileName = file.toString(); + String fileNameMinusSuffix = fileName.substring(0, fileName.indexOf(inprogress_suffix)); + String newName = new Path(fileNameMinusSuffix).getName(); + + Path newFile = new Path( archiveDirPath + Path.SEPARATOR + newName ); + LOG.debug("Renaming complete file to " + newFile); + LOG.info("Completed file " + fileNameMinusSuffix ); + if (!hdfs.rename(file, newFile) ) { + throw new IOException("Rename failed for file: " + file); + } + return newFile; + } + + public void declareOutputFields(OutputFieldsDeclarer declarer) { + Fields fields = reader.getOutputFields(); + declarer.declare(fields); + } + + static class MessageId implements Comparable<MessageId> { + public long msgNumber; // tracks order in which msg came in + public String fullPath; + public FileOffset offset; + + public MessageId(long msgNumber, Path fullPath, FileOffset offset) { + this.msgNumber = msgNumber; + this.fullPath = fullPath.toString(); + this.offset = offset; + } + + @Override + public String toString() { + return "{'" + fullPath + "':" + offset + "}"; + } + + @Override + public int compareTo(MessageId rhs) { + if (msgNumber<rhs.msgNumber) + return -1; + if(msgNumber>rhs.msgNumber) + return 1; + return 0; + } + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ParseException.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ParseException.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ParseException.java new file mode 100644 index 0000000..fdf7751f --- /dev/null +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ParseException.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.hdfs.spout; + +public class ParseException extends Exception { + public ParseException(String message, Throwable cause) { + super(message, cause); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java new file mode 100644 index 0000000..2079ef4 --- /dev/null +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.hdfs.spout; + +import java.io.PrintStream; +import java.util.TreeSet; + +public class ProgressTracker { + + TreeSet<FileOffset> offsets = new TreeSet<>(); + + public void recordAckedOffset(FileOffset newOffset) { + if(newOffset==null) + return; + offsets.add(newOffset); + + FileOffset currHead = offsets.first(); + + if( currHead.isNextOffset(newOffset) ) { // check is a minor optimization + trimHead(); + } + } + + // remove contiguous elements from the head of the heap + // e.g.: 1,2,3,4,10,11,12,15 => 4,10,11,12,15 + private void trimHead() { + if(offsets.size()<=1) + return; + FileOffset head = offsets.first(); + FileOffset head2 = offsets.higher(head); + if( head.isNextOffset(head2) ) { + offsets.pollFirst(); + trimHead(); + } + return; + } + + public FileOffset getCommitPosition() { + if(!offsets.isEmpty()) + return offsets.first().clone(); + return null; + } + + public void dumpState(PrintStream stream) { + stream.println(offsets); + } + + public int size() { + return offsets.size(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java new file mode 100644 index 0000000..5ff7b75 --- /dev/null +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.hdfs.spout; + +import backtype.storm.tuple.Fields; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.ReflectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +// Todo: Track file offsets instead of line number +public class SequenceFileReader<Key extends Writable,Value extends Writable> + extends AbstractFileReader { + private static final Logger LOG = LoggerFactory + .getLogger(SequenceFileReader.class); + private static final int DEFAULT_BUFF_SIZE = 4096; + public static final String BUFFER_SIZE = "hdfsspout.reader.buffer.bytes"; + + private final SequenceFile.Reader reader; + + private final SequenceFileReader.Offset offset; + + private static final String DEFAULT_KEYNAME = "key"; + private static final String DEFAULT_VALNAME = "value"; + + private String keyName; + private String valueName; + + + private final Key key; + private final Value value; + + + public SequenceFileReader(FileSystem fs, Path file, Map conf) + throws IOException { + super(fs, file, new Fields(DEFAULT_KEYNAME, DEFAULT_VALNAME)); + this.keyName = DEFAULT_KEYNAME; + this.valueName = DEFAULT_VALNAME; + int bufferSize = !conf.containsKey(BUFFER_SIZE) ? DEFAULT_BUFF_SIZE : Integer.parseInt( conf.get(BUFFER_SIZE).toString() ); + this.reader = new SequenceFile.Reader(fs.getConf(), SequenceFile.Reader.file(file), SequenceFile.Reader.bufferSize(bufferSize) ); + this.key = (Key) ReflectionUtils.newInstance(reader.getKeyClass(), fs.getConf() ); + this.value = (Value) ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf() ); + this.offset = new SequenceFileReader.Offset(0,0,0); + } + + public SequenceFileReader(FileSystem fs, Path file, Map conf, String offset) + throws IOException { + super(fs, file, new Fields(DEFAULT_KEYNAME, DEFAULT_VALNAME)); + this.keyName = DEFAULT_KEYNAME; + this.valueName = DEFAULT_VALNAME; + int bufferSize = !conf.containsKey(BUFFER_SIZE) ? DEFAULT_BUFF_SIZE : Integer.parseInt( conf.get(BUFFER_SIZE).toString() ); + this.offset = new SequenceFileReader.Offset(offset); + this.reader = new SequenceFile.Reader(fs.getConf(), SequenceFile.Reader.file(file), SequenceFile.Reader.bufferSize(bufferSize) ); + this.reader.sync(this.offset.lastSyncPoint); + this.key = (Key) ReflectionUtils.newInstance(reader.getKeyClass(), fs.getConf() ); + this.value = (Value) ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf() ); + } + + public String getKeyName() { + return keyName; + } + + public void setKeyName(String name) { + if (name == null) + throw new IllegalArgumentException("keyName cannot be null"); + this.keyName = name; + setFields(keyName, valueName); + + } + + public String getValueName() { + return valueName; + } + + public void setValueName(String name) { + if (name == null) + throw new IllegalArgumentException("valueName cannot be null"); + this.valueName = name; + setFields(keyName, valueName); + } + + public List<Object> next() throws IOException, ParseException { + if( reader.next(key, value) ) { + ArrayList<Object> result = new ArrayList<Object>(2); + Collections.addAll(result, key, value); + offset.increment(reader.syncSeen(), reader.getPosition() ); + return result; + } + return null; + } + + @Override + public void close() { + try { + reader.close(); + } catch (IOException e) { + LOG.warn("Ignoring error when closing file " + getFilePath(), e); + } + } + + public Offset getFileOffset() { + return offset; + } + + + public static class Offset implements FileOffset { + private long lastSyncPoint; + private long recordsSinceLastSync; + private long currentRecord; + private long currRecordEndOffset; + private long prevRecordEndOffset; + + public Offset(long lastSyncPoint, long recordsSinceLastSync, long currentRecord) { + this(lastSyncPoint, recordsSinceLastSync, currentRecord, 0, 0 ); + } + + public Offset(long lastSyncPoint, long recordsSinceLastSync, long currentRecord + , long currRecordEndOffset, long prevRecordEndOffset) { + this.lastSyncPoint = lastSyncPoint; + this.recordsSinceLastSync = recordsSinceLastSync; + this.currentRecord = currentRecord; + this.prevRecordEndOffset = prevRecordEndOffset; + this.currRecordEndOffset = currRecordEndOffset; + } + + public Offset(String offset) { + try { + String[] parts = offset.split(","); + this.lastSyncPoint = Long.parseLong(parts[0].split("=")[1]); + this.recordsSinceLastSync = Long.parseLong(parts[1].split("=")[1]); + this.currentRecord = Long.parseLong(parts[2].split("=")[1]); + this.prevRecordEndOffset = 0; + this.currRecordEndOffset = 0; + } catch (Exception e) { + throw new IllegalArgumentException("'" + offset + + "' cannot be interpreted. It is not in expected format for SequenceFileReader." + + " Format e.g. {sync=123:afterSync=345:record=67}"); + } + } + + @Override + public String toString() { + return '{' + + "sync=" + lastSyncPoint + + ":afterSync=" + recordsSinceLastSync + + ":record=" + currentRecord + + '}'; + } + + @Override + public boolean isNextOffset(FileOffset rhs) { + if(rhs instanceof Offset) { + Offset other = ((Offset) rhs); + return other.currentRecord > currentRecord+1; + } + return false; + } + + @Override + public int compareTo(FileOffset o) { + Offset rhs = ((Offset) o); + if(currentRecord<rhs.currentRecord) + return -1; + if(currentRecord==rhs.currentRecord) + return 0; + return 1; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof Offset)) return false; + + Offset offset = (Offset) o; + + return currentRecord == offset.currentRecord; + } + + @Override + public int hashCode() { + return (int) (currentRecord ^ (currentRecord >>> 32)); + } + + void increment(boolean syncSeen, long newBytePosition) { + if(!syncSeen) { + ++recordsSinceLastSync; + } else { + recordsSinceLastSync = 1; + lastSyncPoint = prevRecordEndOffset; + } + ++currentRecord; + prevRecordEndOffset = currRecordEndOffset; + currentRecord = newBytePosition; + } + + @Override + public Offset clone() { + return new Offset(lastSyncPoint, recordsSinceLastSync, currentRecord, currRecordEndOffset, prevRecordEndOffset); + } + + } //class Offset +} //class http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java new file mode 100644 index 0000000..6e4a8b0 --- /dev/null +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.hdfs.spout; + +import backtype.storm.tuple.Fields; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +// Todo: Track file offsets instead of line number +class TextFileReader extends AbstractFileReader { + public static final String CHARSET = "hdfsspout.reader.charset"; + public static final String BUFFER_SIZE = "hdfsspout.reader.buffer.bytes"; + + public static final String DEFAULT_FIELD_NAME = "line"; + + private static final int DEFAULT_BUFF_SIZE = 4096; + + private BufferedReader reader; + private final Logger LOG = LoggerFactory.getLogger(TextFileReader.class); + private TextFileReader.Offset offset; + + public TextFileReader(FileSystem fs, Path file, Map conf) throws IOException { + super(fs, file, new Fields(DEFAULT_FIELD_NAME)); + FSDataInputStream in = fs.open(file); + String charSet = (conf==null || !conf.containsKey(CHARSET) ) ? "UTF-8" : conf.get(CHARSET).toString(); + int buffSz = (conf==null || !conf.containsKey(BUFFER_SIZE) ) ? DEFAULT_BUFF_SIZE : Integer.parseInt( conf.get(BUFFER_SIZE).toString() ); + reader = new BufferedReader(new InputStreamReader(in, charSet), buffSz); + offset = new TextFileReader.Offset(0,0); + } + + public TextFileReader(FileSystem fs, Path file, Map conf, String startOffset) throws IOException { + super(fs, file, new Fields(DEFAULT_FIELD_NAME)); + offset = new TextFileReader.Offset(startOffset); + FSDataInputStream in = fs.open(file); + in.seek(offset.byteOffset); + String charSet = (conf==null || !conf.containsKey(CHARSET) ) ? "UTF-8" : conf.get(CHARSET).toString(); + int buffSz = (conf==null || !conf.containsKey(BUFFER_SIZE) ) ? DEFAULT_BUFF_SIZE : Integer.parseInt( conf.get(BUFFER_SIZE).toString() ); + reader = new BufferedReader(new InputStreamReader(in, charSet), buffSz); + } + + public Offset getFileOffset() { + return offset.clone(); + } + + public List<Object> next() throws IOException, ParseException { + String line = reader.readLine(); + if(line!=null) { + int strByteSize = line.getBytes().length; + offset.increment(strByteSize); + return Collections.singletonList((Object) line); + } + return null; + } + + @Override + public void close() { + try { + reader.close(); + } catch (IOException e) { + LOG.warn("Ignoring error when closing file " + getFilePath(), e); + } + } + + public static class Offset implements FileOffset { + long byteOffset; + long lineNumber; + + public Offset(long byteOffset, long lineNumber) { + this.byteOffset = byteOffset; + this.lineNumber = lineNumber; + } + + public Offset(String offset) { + try { + String[] parts = offset.split(":"); + this.byteOffset = Long.parseLong(parts[0].split("=")[1]); + this.lineNumber = Long.parseLong(parts[1].split("=")[1]); + } catch (Exception e) { + throw new IllegalArgumentException("'" + offset + + "' cannot be interpreted. It is not in expected format for TextFileReader." + + " Format e.g. {byte=123:line=5}"); + } + } + + @Override + public String toString() { + return '{' + + "byte=" + byteOffset + + ":line=" + lineNumber + + '}'; + } + + @Override + public boolean isNextOffset(FileOffset rhs) { + if(rhs instanceof Offset) { + Offset other = ((Offset) rhs); + return other.byteOffset > byteOffset && + other.lineNumber == lineNumber+1; + } + return false; + } + + @Override + public int compareTo(FileOffset o) { + Offset rhs = ((Offset)o); + if(lineNumber < rhs.lineNumber) + return -1; + if(lineNumber == rhs.lineNumber) + return 0; + return 1; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof Offset)) return false; + + Offset that = (Offset) o; + + if (byteOffset != that.byteOffset) + return false; + return lineNumber == that.lineNumber; + } + + @Override + public int hashCode() { + int result = (int) (byteOffset ^ (byteOffset >>> 32)); + result = 31 * result + (int) (lineNumber ^ (lineNumber >>> 32)); + return result; + } + + void increment(int delta) { + ++lineNumber; + byteOffset += delta; + } + + @Override + public Offset clone() { + return new Offset(byteOffset, lineNumber); + } + } //class Offset +} http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java new file mode 100644 index 0000000..ea4b3a3 --- /dev/null +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.hdfs.spout; + + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; + +public class TestDirLock { + + + static MiniDFSCluster.Builder builder; + static MiniDFSCluster hdfsCluster; + static FileSystem fs; + static String hdfsURI; + static Configuration conf = new HdfsConfiguration(); + + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + private Path lockDir = new Path("/tmp/lockdir"); + + + @BeforeClass + public static void setupClass() throws IOException { + builder = new MiniDFSCluster.Builder(new Configuration()); + hdfsCluster = builder.build(); + fs = hdfsCluster.getFileSystem(); + hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/"; + } + + @AfterClass + public static void teardownClass() throws IOException { + fs.close(); + hdfsCluster.shutdown(); + } + + @Before + public void setUp() throws Exception { + assert fs.mkdirs(lockDir) ; + } + + @After + public void tearDown() throws Exception { + fs.delete(lockDir, true); + } + + @Test + public void testConcurrentLocking() throws Exception { +// -Dlog4j.configuration=config + Logger.getRootLogger().setLevel(Level.ERROR); + DirLockingThread[] thds = startThreads(10, lockDir ); + for (DirLockingThread thd : thds) { + thd.start(); + } + System.err.println("Thread creation complete"); + Thread.sleep(5000); + for (DirLockingThread thd : thds) { + thd.join(1000); + if(thd.isAlive() && thd.cleanExit) + System.err.println(thd.getName() + " did not exit cleanly"); + Assert.assertTrue(thd.cleanExit); + } + + Path lockFile = new Path(lockDir + Path.SEPARATOR + DirLock.DIR_LOCK_FILE); + Assert.assertFalse(fs.exists(lockFile)); + } + + + + private DirLockingThread[] startThreads(int thdCount, Path dir) + throws IOException { + DirLockingThread[] result = new DirLockingThread[thdCount]; + for (int i = 0; i < thdCount; i++) { + result[i] = new DirLockingThread(i, fs, dir); + } + return result; + } + + + class DirLockingThread extends Thread { + + private final FileSystem fs; + private final Path dir; + public boolean cleanExit = false; + + public DirLockingThread(int thdNum,FileSystem fs, Path dir) throws IOException { + this.fs = fs; + this.dir = dir; + Thread.currentThread().setName("DirLockingThread-" + thdNum); + } + + @Override + public void run() { + try { + DirLock lock; + do { + lock = DirLock.tryLock(fs, dir); + if(lock==null) { + System.out.println("Retrying lock - " + Thread.currentThread().getId()); + } + } while (lock==null); + lock.release(); + cleanExit= true; + } catch (IOException e) { + e.printStackTrace(); + } + + } + + } +}
