STORM-2810: Fix resource leaks in storm-hdfs tests
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a0308efd Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a0308efd Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a0308efd Branch: refs/heads/1.1.x-branch Commit: a0308efd6c934416206183561194f3470e415edb Parents: ed005ab Author: Stig Rohde Døssing <[email protected]> Authored: Sat Nov 11 18:23:00 2017 +0100 Committer: Stig Rohde Døssing <[email protected]> Committed: Tue Nov 14 07:53:33 2017 +0100 ---------------------------------------------------------------------- .../storm/hdfs/blobstore/HdfsBlobStoreImpl.java | 14 +- .../storm/hdfs/bolt/AbstractHdfsBolt.java | 28 +- .../org/apache/storm/hdfs/spout/HdfsSpout.java | 1484 +++++++++--------- .../hdfs/avro/TestFixedAvroSerializer.java | 13 +- .../hdfs/avro/TestGenericAvroSerializer.java | 11 +- .../storm/hdfs/blobstore/BlobStoreTest.java | 897 ++++++----- .../hdfs/blobstore/HdfsBlobStoreImplTest.java | 288 ++-- .../hdfs/bolt/AvroGenericRecordBoltTest.java | 142 +- .../apache/storm/hdfs/bolt/TestHdfsBolt.java | 108 +- .../storm/hdfs/bolt/TestSequenceFileBolt.java | 71 +- .../apache/storm/hdfs/spout/TestDirLock.java | 281 ++-- .../apache/storm/hdfs/spout/TestFileLock.java | 679 ++++---- .../storm/hdfs/spout/TestHdfsSemantics.java | 311 ++-- .../apache/storm/hdfs/spout/TestHdfsSpout.java | 1235 +++++++-------- .../storm/hdfs/spout/TestProgressTracker.java | 179 +-- .../storm/hdfs/testing/MiniDFSClusterRule.java | 78 + 16 files changed, 2945 insertions(+), 2874 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/a0308efd/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java index a4c88ce..e90e2d3 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java @@ -45,9 +45,10 @@ public class HdfsBlobStoreImpl { private static final long FULL_CLEANUP_FREQ = 60 * 60 * 1000l; private static final int BUCKETS = 1024; - private static final Timer timer = new Timer("HdfsBlobStore cleanup thread", true); private static final String BLOBSTORE_DATA = "data"; + private Timer timer; + public class KeyInHashDirIterator implements Iterator<String> { private int currentBucket = 0; private Iterator<String> it = null; @@ -112,7 +113,6 @@ public class HdfsBlobStoreImpl { private Path _fullPath; private FileSystem _fs; - private TimerTask _cleanup = null; private Configuration _hadoopConf; // blobstore directory is private! @@ -141,7 +141,7 @@ public class HdfsBlobStoreImpl { Object shouldCleanup = conf.get(Config.BLOBSTORE_CLEANUP_ENABLE); if (Utils.getBoolean(shouldCleanup, false)) { LOG.debug("Starting hdfs blobstore cleaner"); - _cleanup = new TimerTask() { + TimerTask cleanup = new TimerTask() { @Override public void run() { try { @@ -151,7 +151,8 @@ public class HdfsBlobStoreImpl { } } }; - timer.scheduleAtFixedRate(_cleanup, 0, FULL_CLEANUP_FREQ); + timer = new Timer("HdfsBlobStore cleanup thread", true); + timer.scheduleAtFixedRate(cleanup, 0, FULL_CLEANUP_FREQ); } } @@ -304,9 +305,8 @@ public class HdfsBlobStoreImpl { } public void shutdown() { - if (_cleanup != null) { - _cleanup.cancel(); - _cleanup = null; + if (timer != null) { + timer.cancel(); } } } http://git-wip-us.apache.org/repos/asf/storm/blob/a0308efd/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java index 12f835c..7a2402c 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java @@ -1,3 +1,4 @@ + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -49,6 +50,7 @@ import java.util.Timer; import java.util.TimerTask; public abstract class AbstractHdfsBolt extends BaseRichBolt { + private static final Logger LOG = LoggerFactory.getLogger(AbstractHdfsBolt.class); private static final Integer DEFAULT_RETRY_COUNT = 3; /** @@ -95,14 +97,19 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { /** * Marked as final to prevent override. Subclasses should implement the doPrepare() method. + * * @param conf * @param topologyContext * @param collector */ public final void prepare(Map conf, TopologyContext topologyContext, OutputCollector collector){ this.writeLock = new Object(); - if (this.syncPolicy == null) throw new IllegalStateException("SyncPolicy must be specified."); - if (this.rotationPolicy == null) throw new IllegalStateException("RotationPolicy must be specified."); + if (this.syncPolicy == null) { + throw new IllegalStateException("SyncPolicy must be specified."); + } + if (this.rotationPolicy == null) { + throw new IllegalStateException("RotationPolicy must be specified."); + } if (this.fsUrl == null) { throw new IllegalStateException("File system URL must be specified."); } @@ -215,10 +222,8 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { } /** - * A tuple must be mapped to a writer based on two factors: - * - bolt specific logic that must separate tuples into different files in the same directory (see the avro bolt - * for an example of this) - * - the directory the tuple will be partioned into + * A tuple must be mapped to a writer based on two factors: - bolt specific logic that must separate tuples into different files in the + * same directory (see the avro bolt for an example of this) - the directory the tuple will be partioned into * * @param tuple * @return @@ -252,6 +257,11 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } + @Override + public void cleanup() { + this.rotationTimer.cancel(); + } + private void syncAllWriters() throws IOException { for (AbstractHDFSWriter writer : writers.values()) { writer.sync(); @@ -281,8 +291,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { final String partitionPath = this.partitioner.getPartitionPath(tuple); final int rotation; - if (rotationCounterMap.containsKey(partitionPath)) - { + if (rotationCounterMap.containsKey(partitionPath)) { rotation = rotationCounterMap.get(partitionPath) + 1; } else { rotation = 0; @@ -300,6 +309,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { abstract protected AbstractHDFSWriter makeNewWriter(Path path, Tuple tuple) throws IOException; static class WritersMap extends LinkedHashMap<String, AbstractHDFSWriter> { + final long maxWriters; public WritersMap(long maxWriters) { @@ -312,4 +322,4 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { return this.size() > this.maxWriters; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/storm/blob/a0308efd/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java index b7627f2..38a791b 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java @@ -1,3 +1,4 @@ + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -9,13 +10,10 @@ * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ - package org.apache.storm.hdfs.spout; import java.io.IOException; @@ -47,800 +45,810 @@ import org.apache.storm.tuple.Fields; public class HdfsSpout extends BaseRichSpout { - // user configurable - private String hdfsUri; // required - private String readerType; // required - private Fields outputFields; // required + // user configurable + private String hdfsUri; // required + private String readerType; // required + private Fields outputFields; // required - private String sourceDir; // required - private Path sourceDirPath; // required + private String sourceDir; // required + private Path sourceDirPath; // required - private String archiveDir; // required - private Path archiveDirPath; // required + private String archiveDir; // required + private Path archiveDirPath; // required - private String badFilesDir; // required - private Path badFilesDirPath; // required + private String badFilesDir; // required + private Path badFilesDirPath; // required - private String lockDir; - private Path lockDirPath; + private String lockDir; + private Path lockDirPath; - private int commitFrequencyCount = Configs.DEFAULT_COMMIT_FREQ_COUNT; - private int commitFrequencySec = Configs.DEFAULT_COMMIT_FREQ_SEC; - private int maxOutstanding = Configs.DEFAULT_MAX_OUTSTANDING; - private int lockTimeoutSec = Configs.DEFAULT_LOCK_TIMEOUT; - private boolean clocksInSync = true; + private int commitFrequencyCount = Configs.DEFAULT_COMMIT_FREQ_COUNT; + private int commitFrequencySec = Configs.DEFAULT_COMMIT_FREQ_SEC; + private int maxOutstanding = Configs.DEFAULT_MAX_OUTSTANDING; + private int lockTimeoutSec = Configs.DEFAULT_LOCK_TIMEOUT; + private boolean clocksInSync = true; - private String inprogress_suffix = ".inprogress"; // not configurable to prevent change between topology restarts - private String ignoreSuffix = ".ignore"; + private String inprogress_suffix = ".inprogress"; // not configurable to prevent change between topology restarts + private String ignoreSuffix = ".ignore"; - private String outputStreamName= null; + private String outputStreamName = null; - // other members - private static final Logger LOG = LoggerFactory.getLogger(HdfsSpout.class); + // other members + private static final Logger LOG = LoggerFactory.getLogger(HdfsSpout.class); - private ProgressTracker tracker = null; + private ProgressTracker tracker = null; - private FileSystem hdfs; - private FileReader reader; + private FileSystem hdfs; + private FileReader reader; - private SpoutOutputCollector collector; - HashMap<MessageId, List<Object> > inflight = new HashMap<>(); - LinkedBlockingQueue<HdfsUtils.Pair<MessageId, List<Object>>> retryList = new LinkedBlockingQueue<>(); - - private Configuration hdfsConfig; - - private Map conf = null; - private FileLock lock; - private String spoutId = null; - - HdfsUtils.Pair<Path,FileLock.LogEntry> lastExpiredLock = null; - private long lastExpiredLockTime = 0; - - private long tupleCounter = 0; - private boolean ackEnabled = false; - private int acksSinceLastCommit = 0 ; - private final AtomicBoolean commitTimeElapsed = new AtomicBoolean(false); - private Timer commitTimer; - private boolean fileReadCompletely = true; - - private String configKey = Configs.DEFAULT_HDFS_CONFIG_KEY; // key for hdfs Kerberos configs - - public HdfsSpout() { - } - - public HdfsSpout setHdfsUri(String hdfsUri) { - this.hdfsUri = hdfsUri; - return this; - } - - public HdfsSpout setReaderType(String readerType) { - this.readerType = readerType; - return this; - } - - public HdfsSpout setSourceDir(String sourceDir) { - this.sourceDir = sourceDir; - return this; - } - - public HdfsSpout setArchiveDir(String archiveDir) { - this.archiveDir = archiveDir; - return this; - } - - public HdfsSpout setBadFilesDir(String badFilesDir) { - this.badFilesDir = badFilesDir; - return this; - } - - public HdfsSpout setLockDir(String lockDir) { - this.lockDir = lockDir; - return this; - } - - public HdfsSpout setCommitFrequencyCount(int commitFrequencyCount) { - this.commitFrequencyCount = commitFrequencyCount; - return this; - } - - public HdfsSpout setCommitFrequencySec(int commitFrequencySec) { - this.commitFrequencySec = commitFrequencySec; - return this; - } - - public HdfsSpout setMaxOutstanding(int maxOutstanding) { - this.maxOutstanding = maxOutstanding; - return this; - } - - public HdfsSpout setLockTimeoutSec(int lockTimeoutSec) { - this.lockTimeoutSec = lockTimeoutSec; - return this; - } - - public HdfsSpout setClocksInSync(boolean clocksInSync) { - this.clocksInSync = clocksInSync; - return this; - } - - - public HdfsSpout setIgnoreSuffix(String ignoreSuffix) { - this.ignoreSuffix = ignoreSuffix; - return this; - } - - /** Output field names. Number of fields depends upon the reader type */ - public HdfsSpout withOutputFields(String... fields) { - outputFields = new Fields(fields); - return this; - } - - /** set key name under which HDFS options are placed. (similar to HDFS bolt). - * default key name is 'hdfs.config' */ - public HdfsSpout withConfigKey(String configKey) { - this.configKey = configKey; - return this; - } - - /** - * Set output stream name - */ - public HdfsSpout withOutputStream(String streamName) { - this.outputStreamName = streamName; - return this; - } - - public Path getLockDirPath() { - return lockDirPath; - } - - public SpoutOutputCollector getCollector() { - return collector; - } - - public void nextTuple() { - LOG.trace("Next Tuple {}", spoutId); - // 1) First re-emit any previously failed tuples (from retryList) - if (!retryList.isEmpty()) { - LOG.debug("Sending tuple from retry list"); - HdfsUtils.Pair<MessageId, List<Object>> pair = retryList.remove(); - emitData(pair.getValue(), pair.getKey()); - return; - } - - if ( ackEnabled && tracker.size()>= maxOutstanding ) { - LOG.warn("Waiting for more ACKs before generating new tuples. " + - "Progress tracker size has reached limit {}, SpoutID {}" - , maxOutstanding, spoutId); - // Don't emit anything .. allow configured spout wait strategy to kick in - return; - } - - // 2) If no failed tuples to be retried, then send tuples from hdfs - while (true) { - try { - // 3) Select a new file if one is not open already - if (reader == null) { - reader = pickNextFile(); - if (reader == null) { - LOG.debug("Currently no new files to process under : " + sourceDirPath); - return; - } else { - fileReadCompletely=false; - } - } - if ( fileReadCompletely ) { // wait for more ACKs before proceeding - return; - } - // 4) Read record from file, emit to collector and record progress - List<Object> tuple = reader.next(); - if (tuple != null) { - fileReadCompletely= false; - ++tupleCounter; - MessageId msgId = new MessageId(tupleCounter, reader.getFilePath(), reader.getFileOffset()); - emitData(tuple, msgId); - - if ( !ackEnabled ) { - ++acksSinceLastCommit; // assume message is immediately ACKed in non-ack mode - commitProgress(reader.getFileOffset()); - } else { - commitProgress(tracker.getCommitPosition()); - } - return; - } else { - fileReadCompletely = true; - if ( !ackEnabled ) { - markFileAsDone(reader.getFilePath()); - } - } - } catch (IOException e) { - LOG.error("I/O Error processing at file location " + getFileProgress(reader), e); - // don't emit anything .. allow configured spout wait strategy to kick in - return; - } catch (ParseException e) { - LOG.error("Parsing error when processing at file location " + getFileProgress(reader) + - ". Skipping remainder of file.", e); - markFileAsBad(reader.getFilePath()); - // Note: We don't return from this method on ParseException to avoid triggering the - // spout wait strategy (due to no emits). Instead we go back into the loop and - // generate a tuple from next file - } - } // while - } - - // will commit progress into lock file if commit threshold is reached - private void commitProgress(FileOffset position) { - if ( position==null ) { - return; - } - if ( lock!=null && canCommitNow() ) { - try { - String pos = position.toString(); - lock.heartbeat(pos); - LOG.debug("{} Committed progress. {}", spoutId, pos); - acksSinceLastCommit = 0; - commitTimeElapsed.set(false); - setupCommitElapseTimer(); - } catch (IOException e) { - LOG.error("Unable to commit progress Will retry later. Spout ID = " + spoutId, e); - } - } - } - - private void setupCommitElapseTimer() { - if ( commitFrequencySec<=0 ) { - return; - } - TimerTask timerTask = new TimerTask() { - @Override - public void run() { - commitTimeElapsed.set(true); - } - }; - commitTimer.schedule(timerTask, commitFrequencySec * 1000); - } - - private static String getFileProgress(FileReader reader) { - return reader.getFilePath() + " " + reader.getFileOffset(); - } - - private void markFileAsDone(Path filePath) { - try { - Path newFile = renameCompletedFile(reader.getFilePath()); - LOG.info("Completed processing {}. Spout Id = {}", newFile, spoutId); - } catch (IOException e) { - LOG.error("Unable to archive completed file" + filePath + " Spout ID " + spoutId, e); - } - closeReaderAndResetTrackers(); - } - - private void markFileAsBad(Path file) { - String fileName = file.toString(); - String fileNameMinusSuffix = fileName.substring(0, fileName.indexOf(inprogress_suffix)); - String originalName = new Path(fileNameMinusSuffix).getName(); - Path newFile = new Path( badFilesDirPath + Path.SEPARATOR + originalName); - - LOG.info("Moving bad file {} to {}. Processed it till offset {}. SpoutID= {}", originalName, newFile, tracker.getCommitPosition(), spoutId); - try { - if (!hdfs.rename(file, newFile) ) { // seems this can fail by returning false or throwing exception - throw new IOException("Move failed for bad file: " + file); // convert false ret value to exception - } - } catch (IOException e) { - LOG.warn("Error moving bad file: " + file + " to destination " + newFile + " SpoutId =" + spoutId, e); - } - closeReaderAndResetTrackers(); - } - - private void closeReaderAndResetTrackers() { - inflight.clear(); - tracker.offsets.clear(); - retryList.clear(); - - reader.close(); - reader = null; - releaseLockAndLog(lock, spoutId); - lock = null; - } - - private static void releaseLockAndLog(FileLock fLock, String spoutId) { - try { - if ( fLock!=null ) { - fLock.release(); - LOG.debug("Spout {} released FileLock. SpoutId = {}", fLock.getLockFile(), spoutId); - } - } catch (IOException e) { - LOG.error("Unable to delete lock file : " +fLock.getLockFile() + " SpoutId =" + spoutId, e); - } - } - - protected void emitData(List<Object> tuple, MessageId id) { - LOG.trace("Emitting - {}", id); - - if ( outputStreamName==null ) - collector.emit( tuple, id ); - else - collector.emit( outputStreamName, tuple, id ); - - inflight.put(id, tuple); - } - - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - LOG.info("Opening HDFS Spout"); - this.conf = conf; - this.commitTimer = new Timer(); - this.tracker = new ProgressTracker(); - this.hdfsConfig = new Configuration(); - - this.collector = collector; - this.hdfsConfig = new Configuration(); - this.tupleCounter = 0; - - // Hdfs related settings - if ( this.hdfsUri==null && conf.containsKey(Configs.HDFS_URI) ) { - this.hdfsUri = conf.get(Configs.HDFS_URI).toString(); - } - if ( this.hdfsUri==null ) { - throw new RuntimeException("HDFS Uri not set on spout"); - } - - try { - this.hdfs = FileSystem.get(URI.create(hdfsUri), hdfsConfig); - } catch (IOException e) { - LOG.error("Unable to instantiate file system", e); - throw new RuntimeException("Unable to instantiate file system", e); - } - - - if ( conf.containsKey(configKey) ) { - Map<String, Object> map = (Map<String, Object>)conf.get(configKey); - if ( map != null ) { - for(String keyName : map.keySet()){ - LOG.info("HDFS Config override : {} = {} ", keyName, String.valueOf(map.get(keyName))); - this.hdfsConfig.set(keyName, String.valueOf(map.get(keyName))); - } - try { - HdfsSecurityUtil.login(conf, hdfsConfig); - } catch (IOException e) { - LOG.error("HDFS Login failed ", e); - throw new RuntimeException(e); - } - } // if (map != null) - } + private SpoutOutputCollector collector; + HashMap<MessageId, List<Object>> inflight = new HashMap<>(); + LinkedBlockingQueue<HdfsUtils.Pair<MessageId, List<Object>>> retryList = new LinkedBlockingQueue<>(); + + private Configuration hdfsConfig; + + private Map conf = null; + private FileLock lock; + private String spoutId = null; + + HdfsUtils.Pair<Path, FileLock.LogEntry> lastExpiredLock = null; + private long lastExpiredLockTime = 0; + + private long tupleCounter = 0; + private boolean ackEnabled = false; + private int acksSinceLastCommit = 0; + private final AtomicBoolean commitTimeElapsed = new AtomicBoolean(false); + private Timer commitTimer; + private boolean fileReadCompletely = true; + + private String configKey = Configs.DEFAULT_HDFS_CONFIG_KEY; // key for hdfs Kerberos configs - // Reader type config - if ( readerType==null && conf.containsKey(Configs.READER_TYPE) ) { - readerType = conf.get(Configs.READER_TYPE).toString(); + public HdfsSpout() { } - checkValidReader(readerType); - // -- source dir config - if ( sourceDir==null && conf.containsKey(Configs.SOURCE_DIR) ) { - sourceDir = conf.get(Configs.SOURCE_DIR).toString(); + public HdfsSpout setHdfsUri(String hdfsUri) { + this.hdfsUri = hdfsUri; + return this; } - if ( sourceDir==null ) { - LOG.error(Configs.SOURCE_DIR + " setting is required"); - throw new RuntimeException(Configs.SOURCE_DIR + " setting is required"); + + public HdfsSpout setReaderType(String readerType) { + this.readerType = readerType; + return this; + } + + public HdfsSpout setSourceDir(String sourceDir) { + this.sourceDir = sourceDir; + return this; } - this.sourceDirPath = new Path( sourceDir ); - // -- archive dir config - if ( archiveDir==null && conf.containsKey(Configs.ARCHIVE_DIR) ) { - archiveDir = conf.get(Configs.ARCHIVE_DIR).toString(); + public HdfsSpout setArchiveDir(String archiveDir) { + this.archiveDir = archiveDir; + return this; } - if ( archiveDir==null ) { - LOG.error(Configs.ARCHIVE_DIR + " setting is required"); - throw new RuntimeException(Configs.ARCHIVE_DIR + " setting is required"); + + public HdfsSpout setBadFilesDir(String badFilesDir) { + this.badFilesDir = badFilesDir; + return this; + } + + public HdfsSpout setLockDir(String lockDir) { + this.lockDir = lockDir; + return this; } - this.archiveDirPath = new Path( archiveDir ); - validateOrMakeDir(hdfs, archiveDirPath, "Archive"); - // -- bad files dir config - if ( badFilesDir==null && conf.containsKey(Configs.BAD_DIR) ) { - badFilesDir = conf.get(Configs.BAD_DIR).toString(); + public HdfsSpout setCommitFrequencyCount(int commitFrequencyCount) { + this.commitFrequencyCount = commitFrequencyCount; + return this; } - if ( badFilesDir==null ) { - LOG.error(Configs.BAD_DIR + " setting is required"); - throw new RuntimeException(Configs.BAD_DIR + " setting is required"); + + public HdfsSpout setCommitFrequencySec(int commitFrequencySec) { + this.commitFrequencySec = commitFrequencySec; + return this; + } + + public HdfsSpout setMaxOutstanding(int maxOutstanding) { + this.maxOutstanding = maxOutstanding; + return this; + } + + public HdfsSpout setLockTimeoutSec(int lockTimeoutSec) { + this.lockTimeoutSec = lockTimeoutSec; + return this; } - this.badFilesDirPath = new Path(badFilesDir); - validateOrMakeDir(hdfs, badFilesDirPath, "bad files"); - // -- ignore file names config - if ( conf.containsKey(Configs.IGNORE_SUFFIX) ) { - this.ignoreSuffix = conf.get(Configs.IGNORE_SUFFIX).toString(); + public HdfsSpout setClocksInSync(boolean clocksInSync) { + this.clocksInSync = clocksInSync; + return this; } - // -- lock dir config - if ( lockDir==null && conf.containsKey(Configs.LOCK_DIR) ) { - lockDir = conf.get(Configs.LOCK_DIR).toString(); + public HdfsSpout setIgnoreSuffix(String ignoreSuffix) { + this.ignoreSuffix = ignoreSuffix; + return this; } - if ( lockDir==null ) { - lockDir = getDefaultLockDir(sourceDirPath); + + /** + * Output field names. Number of fields depends upon the reader type + */ + public HdfsSpout withOutputFields(String... fields) { + outputFields = new Fields(fields); + return this; } - this.lockDirPath = new Path(lockDir); - validateOrMakeDir(hdfs,lockDirPath, "locks"); + /** + * set key name under which HDFS options are placed. (similar to HDFS bolt). default key name is 'hdfs.config' + */ + public HdfsSpout withConfigKey(String configKey) { + this.configKey = configKey; + return this; + } - // -- lock timeout - if ( conf.get(Configs.LOCK_TIMEOUT) !=null ) { - this.lockTimeoutSec = Integer.parseInt(conf.get(Configs.LOCK_TIMEOUT).toString()); + /** + * Set output stream name + */ + public HdfsSpout withOutputStream(String streamName) { + this.outputStreamName = streamName; + return this; } - // -- enable/disable ACKing - Object ackers = conf.get(Config.TOPOLOGY_ACKER_EXECUTORS); - if ( ackers!=null ) { - int ackerCount = Integer.parseInt(ackers.toString()); - this.ackEnabled = (ackerCount>0); - LOG.debug("ACKer count = {}", ackerCount); + public Path getLockDirPath() { + return lockDirPath; } - else { // ackers==null when ackerCount not explicitly set on the topology - this.ackEnabled = true; - LOG.debug("ACK count not explicitly set on topology."); + + public SpoutOutputCollector getCollector() { + return collector; } - LOG.info("ACK mode is {}", ackEnabled ? "enabled" : "disabled"); + public void nextTuple() { + LOG.trace("Next Tuple {}", spoutId); + // 1) First re-emit any previously failed tuples (from retryList) + if (!retryList.isEmpty()) { + LOG.debug("Sending tuple from retry list"); + HdfsUtils.Pair<MessageId, List<Object>> pair = retryList.remove(); + emitData(pair.getValue(), pair.getKey()); + return; + } - // -- commit frequency - count - if ( conf.get(Configs.COMMIT_FREQ_COUNT) != null ) { - commitFrequencyCount = Integer.parseInt(conf.get(Configs.COMMIT_FREQ_COUNT).toString()); + if (ackEnabled && tracker.size() >= maxOutstanding) { + LOG.warn("Waiting for more ACKs before generating new tuples. " + + "Progress tracker size has reached limit {}, SpoutID {}", + maxOutstanding, spoutId); + // Don't emit anything .. allow configured spout wait strategy to kick in + return; + } + + // 2) If no failed tuples to be retried, then send tuples from hdfs + while (true) { + try { + // 3) Select a new file if one is not open already + if (reader == null) { + reader = pickNextFile(); + if (reader == null) { + LOG.debug("Currently no new files to process under : " + sourceDirPath); + return; + } else { + fileReadCompletely = false; + } + } + if (fileReadCompletely) { // wait for more ACKs before proceeding + return; + } + // 4) Read record from file, emit to collector and record progress + List<Object> tuple = reader.next(); + if (tuple != null) { + fileReadCompletely = false; + ++tupleCounter; + MessageId msgId = new MessageId(tupleCounter, reader.getFilePath(), reader.getFileOffset()); + emitData(tuple, msgId); + + if (!ackEnabled) { + ++acksSinceLastCommit; // assume message is immediately ACKed in non-ack mode + commitProgress(reader.getFileOffset()); + } else { + commitProgress(tracker.getCommitPosition()); + } + return; + } else { + fileReadCompletely = true; + if (!ackEnabled) { + markFileAsDone(reader.getFilePath()); + } + } + } catch (IOException e) { + LOG.error("I/O Error processing at file location " + getFileProgress(reader), e); + // don't emit anything .. allow configured spout wait strategy to kick in + return; + } catch (ParseException e) { + LOG.error("Parsing error when processing at file location " + getFileProgress(reader) + + ". Skipping remainder of file.", e); + markFileAsBad(reader.getFilePath()); + // Note: We don't return from this method on ParseException to avoid triggering the + // spout wait strategy (due to no emits). Instead we go back into the loop and + // generate a tuple from next file + } + } // while + } + + // will commit progress into lock file if commit threshold is reached + private void commitProgress(FileOffset position) { + if (position == null) { + return; + } + if (lock != null && canCommitNow()) { + try { + String pos = position.toString(); + lock.heartbeat(pos); + LOG.debug("{} Committed progress. {}", spoutId, pos); + acksSinceLastCommit = 0; + commitTimeElapsed.set(false); + setupCommitElapseTimer(); + } catch (IOException e) { + LOG.error("Unable to commit progress Will retry later. Spout ID = " + spoutId, e); + } + } } - // -- commit frequency - seconds - if ( conf.get(Configs.COMMIT_FREQ_SEC) != null ) { - commitFrequencySec = Integer.parseInt(conf.get(Configs.COMMIT_FREQ_SEC).toString()); - if ( commitFrequencySec<=0 ) { - throw new RuntimeException(Configs.COMMIT_FREQ_SEC + " setting must be greater than 0"); - } - } - - // -- max outstanding tuples - if ( conf.get(Configs.MAX_OUTSTANDING) !=null ) { - maxOutstanding = Integer.parseInt(conf.get(Configs.MAX_OUTSTANDING).toString()); - } + private void setupCommitElapseTimer() { + if (commitFrequencySec <= 0) { + return; + } + TimerTask timerTask = new TimerTask() { + @Override + public void run() { + commitTimeElapsed.set(true); + } + }; + commitTimer.schedule(timerTask, commitFrequencySec * 1000); + } - // -- clocks in sync - if ( conf.get(Configs.CLOCKS_INSYNC) !=null ) { - clocksInSync = Boolean.parseBoolean(conf.get(Configs.CLOCKS_INSYNC).toString()); + private static String getFileProgress(FileReader reader) { + return reader.getFilePath() + " " + reader.getFileOffset(); } - // -- spout id - spoutId = context.getThisComponentId(); + private void markFileAsDone(Path filePath) { + try { + Path newFile = renameCompletedFile(reader.getFilePath()); + LOG.info("Completed processing {}. Spout Id = {}", newFile, spoutId); + } catch (IOException e) { + LOG.error("Unable to archive completed file" + filePath + " Spout ID " + spoutId, e); + } + closeReaderAndResetTrackers(); + } - // setup timer for commit elapse time tracking - setupCommitElapseTimer(); - } + private void markFileAsBad(Path file) { + String fileName = file.toString(); + String fileNameMinusSuffix = fileName.substring(0, fileName.indexOf(inprogress_suffix)); + String originalName = new Path(fileNameMinusSuffix).getName(); + Path newFile = new Path(badFilesDirPath + Path.SEPARATOR + originalName); - private static void validateOrMakeDir(FileSystem fs, Path dir, String dirDescription) { - try { - if ( fs.exists(dir) ) { - if ( !fs.isDirectory(dir) ) { - LOG.error(dirDescription + " directory is a file, not a dir. " + dir); - throw new RuntimeException(dirDescription + " directory is a file, not a dir. " + dir); + LOG.info("Moving bad file {} to {}. Processed it till offset {}. SpoutID= {}", originalName, newFile, tracker.getCommitPosition(), spoutId); + try { + if (!hdfs.rename(file, newFile)) { // seems this can fail by returning false or throwing exception + throw new IOException("Move failed for bad file: " + file); // convert false ret value to exception + } + } catch (IOException e) { + LOG.warn("Error moving bad file: " + file + " to destination " + newFile + " SpoutId =" + spoutId, e); } - } else if ( ! fs.mkdirs(dir) ) { - LOG.error("Unable to create " + dirDescription + " directory " + dir); - throw new RuntimeException("Unable to create " + dirDescription + " directory " + dir); - } - } catch (IOException e) { - LOG.error("Unable to create " + dirDescription + " directory " + dir, e); - throw new RuntimeException("Unable to create " + dirDescription + " directory " + dir, e); + closeReaderAndResetTrackers(); } - } - private String getDefaultLockDir(Path sourceDirPath) { - return sourceDirPath.toString() + Path.SEPARATOR + Configs.DEFAULT_LOCK_DIR; - } + private void closeReaderAndResetTrackers() { + inflight.clear(); + tracker.offsets.clear(); + retryList.clear(); + + reader.close(); + reader = null; + releaseLockAndLog(lock, spoutId); + lock = null; + } - private static void checkValidReader(String readerType) { - if ( readerType.equalsIgnoreCase(Configs.TEXT) || readerType.equalsIgnoreCase(Configs.SEQ) ) - return; - try { - Class<?> classType = Class.forName(readerType); - classType.getConstructor(FileSystem.class, Path.class, Map.class); - return; - } catch (ClassNotFoundException e) { - LOG.error(readerType + " not found in classpath.", e); - throw new IllegalArgumentException(readerType + " not found in classpath.", e); - } catch (NoSuchMethodException e) { - LOG.error(readerType + " is missing the expected constructor for Readers.", e); - throw new IllegalArgumentException(readerType + " is missing the expected constuctor for Readers."); + private static void releaseLockAndLog(FileLock fLock, String spoutId) { + try { + if (fLock != null) { + fLock.release(); + LOG.debug("Spout {} released FileLock. SpoutId = {}", fLock.getLockFile(), spoutId); + } + } catch (IOException e) { + LOG.error("Unable to delete lock file : " + fLock.getLockFile() + " SpoutId =" + spoutId, e); + } } - } - @Override - public void ack(Object msgId) { - LOG.trace("Ack received for msg {} on spout {}", msgId, spoutId); - if ( !ackEnabled ) { - return; + protected void emitData(List<Object> tuple, MessageId id) { + LOG.trace("Emitting - {}", id); + + if (outputStreamName == null) { + collector.emit(tuple, id); + } else { + collector.emit(outputStreamName, tuple, id); + } + + inflight.put(id, tuple); } - MessageId id = (MessageId) msgId; - inflight.remove(id); - ++acksSinceLastCommit; - tracker.recordAckedOffset(id.offset); - commitProgress(tracker.getCommitPosition()); - if ( fileReadCompletely && inflight.isEmpty() ) { - markFileAsDone(reader.getFilePath()); - reader = null; - } - super.ack(msgId); - } - - private boolean canCommitNow() { - - if ( commitFrequencyCount>0 && acksSinceLastCommit >= commitFrequencyCount ) { - return true; - } - return commitTimeElapsed.get(); - } - - @Override - public void fail(Object msgId) { - LOG.trace("Fail received for msg id {} on spout {}", msgId, spoutId); - super.fail(msgId); - if ( ackEnabled ) { - HdfsUtils.Pair<MessageId, List<Object>> item = HdfsUtils.Pair.of(msgId, inflight.remove(msgId)); - retryList.add(item); - } - } - - private FileReader pickNextFile() { - try { - // 1) If there are any abandoned files, pick oldest one - lock = getOldestExpiredLock(); - if ( lock!=null ) { - LOG.debug("Spout {} now took over ownership of abandoned FileLock {}", spoutId, lock.getLockFile()); - Path file = getFileForLockFile(lock.getLockFile(), sourceDirPath); - String resumeFromOffset = lock.getLastLogEntry().fileOffset; - LOG.info("Resuming processing of abandoned file : {}", file); - return createFileReader(file, resumeFromOffset); - } - - // 2) If no abandoned files, then pick oldest file in sourceDirPath, lock it and rename it - Collection<Path> listing = HdfsUtils.listFilesByModificationTime(hdfs, sourceDirPath, 0); - - for (Path file : listing) { - if (file.getName().endsWith(inprogress_suffix)) { - continue; - } - if (file.getName().endsWith(ignoreSuffix)) { - continue; - } - lock = FileLock.tryLock(hdfs, file, lockDirPath, spoutId); - if (lock == null) { - LOG.debug("Unable to get FileLock for {}, so skipping it.", file); - continue; // could not lock, so try another file. + + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + LOG.info("Opening HDFS Spout"); + this.conf = conf; + this.commitTimer = new Timer(); + this.tracker = new ProgressTracker(); + this.hdfsConfig = new Configuration(); + + this.collector = collector; + this.hdfsConfig = new Configuration(); + this.tupleCounter = 0; + + // Hdfs related settings + if (this.hdfsUri == null && conf.containsKey(Configs.HDFS_URI)) { + this.hdfsUri = conf.get(Configs.HDFS_URI).toString(); + } + if (this.hdfsUri == null) { + throw new RuntimeException("HDFS Uri not set on spout"); } + try { - Path newFile = renameToInProgressFile(file); - FileReader result = createFileReader(newFile); - LOG.info("Processing : {} ", file); - return result; + this.hdfs = FileSystem.get(URI.create(hdfsUri), hdfsConfig); + } catch (IOException e) { + LOG.error("Unable to instantiate file system", e); + throw new RuntimeException("Unable to instantiate file system", e); + } + + if (conf.containsKey(configKey)) { + Map<String, Object> map = (Map<String, Object>) conf.get(configKey); + if (map != null) { + for (String keyName : map.keySet()) { + LOG.info("HDFS Config override : {} = {} ", keyName, String.valueOf(map.get(keyName))); + this.hdfsConfig.set(keyName, String.valueOf(map.get(keyName))); + } + try { + HdfsSecurityUtil.login(conf, hdfsConfig); + } catch (IOException e) { + LOG.error("HDFS Login failed ", e); + throw new RuntimeException(e); + } + } // if (map != null) + } + + // Reader type config + if (readerType == null && conf.containsKey(Configs.READER_TYPE)) { + readerType = conf.get(Configs.READER_TYPE).toString(); + } + checkValidReader(readerType); + + // -- source dir config + if (sourceDir == null && conf.containsKey(Configs.SOURCE_DIR)) { + sourceDir = conf.get(Configs.SOURCE_DIR).toString(); + } + if (sourceDir == null) { + LOG.error(Configs.SOURCE_DIR + " setting is required"); + throw new RuntimeException(Configs.SOURCE_DIR + " setting is required"); + } + this.sourceDirPath = new Path(sourceDir); + + // -- archive dir config + if (archiveDir == null && conf.containsKey(Configs.ARCHIVE_DIR)) { + archiveDir = conf.get(Configs.ARCHIVE_DIR).toString(); + } + if (archiveDir == null) { + LOG.error(Configs.ARCHIVE_DIR + " setting is required"); + throw new RuntimeException(Configs.ARCHIVE_DIR + " setting is required"); + } + this.archiveDirPath = new Path(archiveDir); + validateOrMakeDir(hdfs, archiveDirPath, "Archive"); + + // -- bad files dir config + if (badFilesDir == null && conf.containsKey(Configs.BAD_DIR)) { + badFilesDir = conf.get(Configs.BAD_DIR).toString(); + } + if (badFilesDir == null) { + LOG.error(Configs.BAD_DIR + " setting is required"); + throw new RuntimeException(Configs.BAD_DIR + " setting is required"); + } + this.badFilesDirPath = new Path(badFilesDir); + validateOrMakeDir(hdfs, badFilesDirPath, "bad files"); + + // -- ignore file names config + if (conf.containsKey(Configs.IGNORE_SUFFIX)) { + this.ignoreSuffix = conf.get(Configs.IGNORE_SUFFIX).toString(); + } + + // -- lock dir config + if (lockDir == null && conf.containsKey(Configs.LOCK_DIR)) { + lockDir = conf.get(Configs.LOCK_DIR).toString(); + } + if (lockDir == null) { + lockDir = getDefaultLockDir(sourceDirPath); + } + this.lockDirPath = new Path(lockDir); + validateOrMakeDir(hdfs, lockDirPath, "locks"); + + // -- lock timeout + if (conf.get(Configs.LOCK_TIMEOUT) != null) { + this.lockTimeoutSec = Integer.parseInt(conf.get(Configs.LOCK_TIMEOUT).toString()); + } + + // -- enable/disable ACKing + Object ackers = conf.get(Config.TOPOLOGY_ACKER_EXECUTORS); + if (ackers != null) { + int ackerCount = Integer.parseInt(ackers.toString()); + this.ackEnabled = (ackerCount > 0); + LOG.debug("ACKer count = {}", ackerCount); + } else { // ackers==null when ackerCount not explicitly set on the topology + this.ackEnabled = true; + LOG.debug("ACK count not explicitly set on topology."); + } + + LOG.info("ACK mode is {}", ackEnabled ? "enabled" : "disabled"); + + // -- commit frequency - count + if (conf.get(Configs.COMMIT_FREQ_COUNT) != null) { + commitFrequencyCount = Integer.parseInt(conf.get(Configs.COMMIT_FREQ_COUNT).toString()); + } + + // -- commit frequency - seconds + if (conf.get(Configs.COMMIT_FREQ_SEC) != null) { + commitFrequencySec = Integer.parseInt(conf.get(Configs.COMMIT_FREQ_SEC).toString()); + if (commitFrequencySec <= 0) { + throw new RuntimeException(Configs.COMMIT_FREQ_SEC + " setting must be greater than 0"); + } + } + + // -- max outstanding tuples + if (conf.get(Configs.MAX_OUTSTANDING) != null) { + maxOutstanding = Integer.parseInt(conf.get(Configs.MAX_OUTSTANDING).toString()); + } + + // -- clocks in sync + if (conf.get(Configs.CLOCKS_INSYNC) != null) { + clocksInSync = Boolean.parseBoolean(conf.get(Configs.CLOCKS_INSYNC).toString()); + } + + // -- spout id + spoutId = context.getThisComponentId(); + + // setup timer for commit elapse time tracking + setupCommitElapseTimer(); + } + + @Override + public void close() { + this.commitTimer.cancel(); + } + + private static void validateOrMakeDir(FileSystem fs, Path dir, String dirDescription) { + try { + if (fs.exists(dir)) { + if (!fs.isDirectory(dir)) { + LOG.error(dirDescription + " directory is a file, not a dir. " + dir); + throw new RuntimeException(dirDescription + " directory is a file, not a dir. " + dir); + } + } else if (!fs.mkdirs(dir)) { + LOG.error("Unable to create " + dirDescription + " directory " + dir); + throw new RuntimeException("Unable to create " + dirDescription + " directory " + dir); + } + } catch (IOException e) { + LOG.error("Unable to create " + dirDescription + " directory " + dir, e); + throw new RuntimeException("Unable to create " + dirDescription + " directory " + dir, e); + } + } + + private String getDefaultLockDir(Path sourceDirPath) { + return sourceDirPath.toString() + Path.SEPARATOR + Configs.DEFAULT_LOCK_DIR; + } + + private static void checkValidReader(String readerType) { + if (readerType.equalsIgnoreCase(Configs.TEXT) || readerType.equalsIgnoreCase(Configs.SEQ)) { + return; + } + try { + Class<?> classType = Class.forName(readerType); + classType.getConstructor(FileSystem.class, Path.class, Map.class); + return; + } catch (ClassNotFoundException e) { + LOG.error(readerType + " not found in classpath.", e); + throw new IllegalArgumentException(readerType + " not found in classpath.", e); + } catch (NoSuchMethodException e) { + LOG.error(readerType + " is missing the expected constructor for Readers.", e); + throw new IllegalArgumentException(readerType + " is missing the expected constuctor for Readers."); + } + } + + @Override + public void ack(Object msgId) { + LOG.trace("Ack received for msg {} on spout {}", msgId, spoutId); + if (!ackEnabled) { + return; + } + MessageId id = (MessageId) msgId; + inflight.remove(id); + ++acksSinceLastCommit; + tracker.recordAckedOffset(id.offset); + commitProgress(tracker.getCommitPosition()); + if (fileReadCompletely && inflight.isEmpty()) { + markFileAsDone(reader.getFilePath()); + reader = null; + } + super.ack(msgId); + } + + private boolean canCommitNow() { + + if (commitFrequencyCount > 0 && acksSinceLastCommit >= commitFrequencyCount) { + return true; + } + return commitTimeElapsed.get(); + } + + @Override + public void fail(Object msgId) { + LOG.trace("Fail received for msg id {} on spout {}", msgId, spoutId); + super.fail(msgId); + if (ackEnabled) { + HdfsUtils.Pair<MessageId, List<Object>> item = HdfsUtils.Pair.of(msgId, inflight.remove(msgId)); + retryList.add(item); + } + } + + private FileReader pickNextFile() { + try { + // 1) If there are any abandoned files, pick oldest one + lock = getOldestExpiredLock(); + if (lock != null) { + LOG.debug("Spout {} now took over ownership of abandoned FileLock {}", spoutId, lock.getLockFile()); + Path file = getFileForLockFile(lock.getLockFile(), sourceDirPath); + String resumeFromOffset = lock.getLastLogEntry().fileOffset; + LOG.info("Resuming processing of abandoned file : {}", file); + return createFileReader(file, resumeFromOffset); + } + + // 2) If no abandoned files, then pick oldest file in sourceDirPath, lock it and rename it + Collection<Path> listing = HdfsUtils.listFilesByModificationTime(hdfs, sourceDirPath, 0); + + for (Path file : listing) { + if (file.getName().endsWith(inprogress_suffix)) { + continue; + } + if (file.getName().endsWith(ignoreSuffix)) { + continue; + } + lock = FileLock.tryLock(hdfs, file, lockDirPath, spoutId); + if (lock == null) { + LOG.debug("Unable to get FileLock for {}, so skipping it.", file); + continue; // could not lock, so try another file. + } + try { + Path newFile = renameToInProgressFile(file); + FileReader result = createFileReader(newFile); + LOG.info("Processing : {} ", file); + return result; + } catch (Exception e) { + LOG.error("Skipping file " + file, e); + releaseLockAndLog(lock, spoutId); + continue; + } + } + + return null; + } catch (IOException e) { + LOG.error("Unable to select next file for consumption " + sourceDirPath, e); + return null; + } + } + + /** + * If clocks in sync, then acquires the oldest expired lock Else, on first call, just remembers the oldest expired lock, on next call + * check if the lock is updated. if not updated then acquires the lock + * + * @return a lock object + * @throws IOException + */ + private FileLock getOldestExpiredLock() throws IOException { + // 1 - acquire lock on dir + DirLock dirlock = DirLock.tryLock(hdfs, lockDirPath); + if (dirlock == null) { + dirlock = DirLock.takeOwnershipIfStale(hdfs, lockDirPath, lockTimeoutSec); + if (dirlock == null) { + LOG.debug("Spout {} could not take over ownership of DirLock for {}", spoutId, lockDirPath); + return null; + } + LOG.debug("Spout {} now took over ownership of abandoned DirLock for {}", spoutId, lockDirPath); + } else { + LOG.debug("Spout {} now owns DirLock for {}", spoutId, lockDirPath); + } + + try { + // 2 - if clocks are in sync then simply take ownership of the oldest expired lock + if (clocksInSync) { + return FileLock.acquireOldestExpiredLock(hdfs, lockDirPath, lockTimeoutSec, spoutId); + } + + // 3 - if clocks are not in sync .. + if (lastExpiredLock == null) { + // just make a note of the oldest expired lock now and check if its still unmodified after lockTimeoutSec + lastExpiredLock = FileLock.locateOldestExpiredLock(hdfs, lockDirPath, lockTimeoutSec); + lastExpiredLockTime = System.currentTimeMillis(); + return null; + } + // see if lockTimeoutSec time has elapsed since we last selected the lock file + if (hasExpired(lastExpiredLockTime)) { + return null; + } + + // If lock file has expired, then own it + FileLock.LogEntry lastEntry = FileLock.getLastEntry(hdfs, lastExpiredLock.getKey()); + if (lastEntry.equals(lastExpiredLock.getValue())) { + FileLock result = FileLock.takeOwnership(hdfs, lastExpiredLock.getKey(), lastEntry, spoutId); + lastExpiredLock = null; + return result; + } else { + // if lock file has been updated since last time, then leave this lock file alone + lastExpiredLock = null; + return null; + } + } finally { + dirlock.release(); + LOG.debug("Released DirLock {}, SpoutID {} ", dirlock.getLockFile(), spoutId); + } + } + + private boolean hasExpired(long lastModifyTime) { + return (System.currentTimeMillis() - lastModifyTime) < lockTimeoutSec * 1000; + } + + /** + * Creates a reader that reads from beginning of file + * + * @param file file to read + * @return + * @throws IOException + */ + private FileReader createFileReader(Path file) + throws IOException { + if (readerType.equalsIgnoreCase(Configs.SEQ)) { + return new SequenceFileReader(this.hdfs, file, conf); + } + if (readerType.equalsIgnoreCase(Configs.TEXT)) { + return new TextFileReader(this.hdfs, file, conf); + } + try { + Class<?> clsType = Class.forName(readerType); + Constructor<?> constructor = clsType.getConstructor(FileSystem.class, Path.class, Map.class); + return (FileReader) constructor.newInstance(this.hdfs, file, conf); } catch (Exception e) { - LOG.error("Skipping file " + file, e); - releaseLockAndLog(lock, spoutId); - continue; - } - } - - return null; - } catch (IOException e) { - LOG.error("Unable to select next file for consumption " + sourceDirPath, e); - return null; - } - } - - /** - * If clocks in sync, then acquires the oldest expired lock - * Else, on first call, just remembers the oldest expired lock, on next call check if the lock is updated. if not updated then acquires the lock - * @return a lock object - * @throws IOException - */ - private FileLock getOldestExpiredLock() throws IOException { - // 1 - acquire lock on dir - DirLock dirlock = DirLock.tryLock(hdfs, lockDirPath); - if (dirlock == null) { - dirlock = DirLock.takeOwnershipIfStale(hdfs, lockDirPath, lockTimeoutSec); - if (dirlock == null) { - LOG.debug("Spout {} could not take over ownership of DirLock for {}", spoutId, lockDirPath); - return null; - } - LOG.debug("Spout {} now took over ownership of abandoned DirLock for {}", spoutId, lockDirPath); - } else { - LOG.debug("Spout {} now owns DirLock for {}", spoutId, lockDirPath); - } - - try { - // 2 - if clocks are in sync then simply take ownership of the oldest expired lock - if (clocksInSync) { - return FileLock.acquireOldestExpiredLock(hdfs, lockDirPath, lockTimeoutSec, spoutId); - } - - // 3 - if clocks are not in sync .. - if ( lastExpiredLock == null ) { - // just make a note of the oldest expired lock now and check if its still unmodified after lockTimeoutSec - lastExpiredLock = FileLock.locateOldestExpiredLock(hdfs, lockDirPath, lockTimeoutSec); - lastExpiredLockTime = System.currentTimeMillis(); - return null; - } - // see if lockTimeoutSec time has elapsed since we last selected the lock file - if ( hasExpired(lastExpiredLockTime) ) { - return null; - } - - // If lock file has expired, then own it - FileLock.LogEntry lastEntry = FileLock.getLastEntry(hdfs, lastExpiredLock.getKey()); - if ( lastEntry.equals(lastExpiredLock.getValue()) ) { - FileLock result = FileLock.takeOwnership(hdfs, lastExpiredLock.getKey(), lastEntry, spoutId); - lastExpiredLock = null; - return result; - } else { - // if lock file has been updated since last time, then leave this lock file alone - lastExpiredLock = null; + LOG.error(e.getMessage(), e); + throw new RuntimeException("Unable to instantiate " + readerType + " reader", e); + } + } + + /** + * Creates a reader that starts reading from 'offset' + * + * @param file the file to read + * @param offset the offset string should be understandable by the reader type being used + * @return + * @throws IOException + */ + private FileReader createFileReader(Path file, String offset) + throws IOException { + if (readerType.equalsIgnoreCase(Configs.SEQ)) { + return new SequenceFileReader(this.hdfs, file, conf, offset); + } + if (readerType.equalsIgnoreCase(Configs.TEXT)) { + return new TextFileReader(this.hdfs, file, conf, offset); + } + + try { + Class<?> clsType = Class.forName(readerType); + Constructor<?> constructor = clsType.getConstructor(FileSystem.class, Path.class, Map.class, String.class); + return (FileReader) constructor.newInstance(this.hdfs, file, conf, offset); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException("Unable to instantiate " + readerType, e); + } + } + + /** + * Renames files with .inprogress suffix + * + * @return path of renamed file + * @throws if operation fails + */ + private Path renameToInProgressFile(Path file) + throws IOException { + Path newFile = new Path(file.toString() + inprogress_suffix); + try { + if (hdfs.rename(file, newFile)) { + return newFile; + } + throw new RenameException(file, newFile); + } catch (IOException e) { + throw new RenameException(file, newFile, e); + } + } + + /** + * Returns the corresponding input file in the 'sourceDirPath' for the specified lock file. If no such file is found then returns null + */ + private Path getFileForLockFile(Path lockFile, Path sourceDirPath) + throws IOException { + String lockFileName = lockFile.getName(); + Path dataFile = new Path(sourceDirPath + Path.SEPARATOR + lockFileName + inprogress_suffix); + if (hdfs.exists(dataFile)) { + return dataFile; + } + dataFile = new Path(sourceDirPath + Path.SEPARATOR + lockFileName); + if (hdfs.exists(dataFile)) { + return dataFile; + } return null; - } - } finally { - dirlock.release(); - LOG.debug("Released DirLock {}, SpoutID {} ", dirlock.getLockFile(), spoutId); - } - } - - private boolean hasExpired(long lastModifyTime) { - return (System.currentTimeMillis() - lastModifyTime ) < lockTimeoutSec*1000; - } - - /** - * Creates a reader that reads from beginning of file - * @param file file to read - * @return - * @throws IOException - */ - private FileReader createFileReader(Path file) - throws IOException { - if ( readerType.equalsIgnoreCase(Configs.SEQ) ) { - return new SequenceFileReader(this.hdfs, file, conf); - } - if ( readerType.equalsIgnoreCase(Configs.TEXT) ) { - return new TextFileReader(this.hdfs, file, conf); - } - try { - Class<?> clsType = Class.forName(readerType); - Constructor<?> constructor = clsType.getConstructor(FileSystem.class, Path.class, Map.class); - return (FileReader) constructor.newInstance(this.hdfs, file, conf); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw new RuntimeException("Unable to instantiate " + readerType + " reader", e); - } - } - - - /** - * Creates a reader that starts reading from 'offset' - * @param file the file to read - * @param offset the offset string should be understandable by the reader type being used - * @return - * @throws IOException - */ - private FileReader createFileReader(Path file, String offset) - throws IOException { - if ( readerType.equalsIgnoreCase(Configs.SEQ) ) { - return new SequenceFileReader(this.hdfs, file, conf, offset); - } - if ( readerType.equalsIgnoreCase(Configs.TEXT) ) { - return new TextFileReader(this.hdfs, file, conf, offset); - } - - try { - Class<?> clsType = Class.forName(readerType); - Constructor<?> constructor = clsType.getConstructor(FileSystem.class, Path.class, Map.class, String.class); - return (FileReader) constructor.newInstance(this.hdfs, file, conf, offset); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw new RuntimeException("Unable to instantiate " + readerType, e); - } - } - - /** - * Renames files with .inprogress suffix - * @return path of renamed file - * @throws if operation fails - */ - private Path renameToInProgressFile(Path file) - throws IOException { - Path newFile = new Path( file.toString() + inprogress_suffix ); - try { - if (hdfs.rename(file, newFile)) { - return newFile; - } - throw new RenameException(file, newFile); - } catch (IOException e){ - throw new RenameException(file, newFile, e); - } - } - - /** Returns the corresponding input file in the 'sourceDirPath' for the specified lock file. - * If no such file is found then returns null - */ - private Path getFileForLockFile(Path lockFile, Path sourceDirPath) - throws IOException { - String lockFileName = lockFile.getName(); - Path dataFile = new Path(sourceDirPath + Path.SEPARATOR + lockFileName + inprogress_suffix); - if ( hdfs.exists(dataFile) ) { - return dataFile; - } - dataFile = new Path(sourceDirPath + Path.SEPARATOR + lockFileName); - if ( hdfs.exists(dataFile) ) { - return dataFile; - } - return null; - } - - - // renames files and returns the new file path - private Path renameCompletedFile(Path file) throws IOException { - String fileName = file.toString(); - String fileNameMinusSuffix = fileName.substring(0, fileName.indexOf(inprogress_suffix)); - String newName = new Path(fileNameMinusSuffix).getName(); - - Path newFile = new Path( archiveDirPath + Path.SEPARATOR + newName ); - LOG.info("Completed consuming file {}", fileNameMinusSuffix); - if ( !hdfs.rename(file, newFile) ) { - throw new IOException("Rename failed for file: " + file); - } - LOG.debug("Renamed file {} to {} ", file, newFile); - return newFile; - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - if (outputStreamName!=null) { - declarer.declareStream(outputStreamName, outputFields); - } else { - declarer.declare(outputFields); - } - } - - static class MessageId implements Comparable<MessageId> { - public long msgNumber; // tracks order in which msg came in - public String fullPath; - public FileOffset offset; - - public MessageId(long msgNumber, Path fullPath, FileOffset offset) { - this.msgNumber = msgNumber; - this.fullPath = fullPath.toString(); - this.offset = offset; } - @Override - public String toString() { - return "{'" + fullPath + "':" + offset + "}"; + // renames files and returns the new file path + private Path renameCompletedFile(Path file) throws IOException { + String fileName = file.toString(); + String fileNameMinusSuffix = fileName.substring(0, fileName.indexOf(inprogress_suffix)); + String newName = new Path(fileNameMinusSuffix).getName(); + + Path newFile = new Path(archiveDirPath + Path.SEPARATOR + newName); + LOG.info("Completed consuming file {}", fileNameMinusSuffix); + if (!hdfs.rename(file, newFile)) { + throw new IOException("Rename failed for file: " + file); + } + LOG.debug("Renamed file {} to {} ", file, newFile); + return newFile; } @Override - public int compareTo(MessageId rhs) { - if ( msgNumber<rhs.msgNumber ) { - return -1; - } - if ( msgNumber>rhs.msgNumber ) { - return 1; - } - return 0; + public void declareOutputFields(OutputFieldsDeclarer declarer) { + if (outputStreamName != null) { + declarer.declareStream(outputStreamName, outputFields); + } else { + declarer.declare(outputFields); + } } - } - private static class RenameException extends IOException { - public final Path oldFile; - public final Path newFile; + static class MessageId implements Comparable<MessageId> { + + public long msgNumber; // tracks order in which msg came in + public String fullPath; + public FileOffset offset; - public RenameException(Path oldFile, Path newFile) { - super("Rename of " + oldFile + " to " + newFile + " failed"); - this.oldFile = oldFile; - this.newFile = newFile; + public MessageId(long msgNumber, Path fullPath, FileOffset offset) { + this.msgNumber = msgNumber; + this.fullPath = fullPath.toString(); + this.offset = offset; + } + + @Override + public String toString() { + return "{'" + fullPath + "':" + offset + "}"; + } + + @Override + public int compareTo(MessageId rhs) { + if (msgNumber < rhs.msgNumber) { + return -1; + } + if (msgNumber > rhs.msgNumber) { + return 1; + } + return 0; + } } - public RenameException(Path oldFile, Path newFile, IOException cause) { - super("Rename of " + oldFile + " to " + newFile + " failed", cause); - this.oldFile = oldFile; - this.newFile = newFile; + private static class RenameException extends IOException { + + public final Path oldFile; + public final Path newFile; + + public RenameException(Path oldFile, Path newFile) { + super("Rename of " + oldFile + " to " + newFile + " failed"); + this.oldFile = oldFile; + this.newFile = newFile; + } + + public RenameException(Path oldFile, Path newFile, IOException cause) { + super("Rename of " + oldFile + " to " + newFile + " failed", cause); + this.oldFile = oldFile; + this.newFile = newFile; + } } - } } http://git-wip-us.apache.org/repos/asf/storm/blob/a0308efd/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestFixedAvroSerializer.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestFixedAvroSerializer.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestFixedAvroSerializer.java index a584f91..b588f5b 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestFixedAvroSerializer.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestFixedAvroSerializer.java @@ -21,8 +21,7 @@ import org.apache.avro.Schema; import org.junit.Assert; import org.junit.Test; -import java.util.ArrayList; -import java.util.List; +import org.junit.BeforeClass; public class TestFixedAvroSerializer { //These should match FixedAvroSerializer.config in the test resources @@ -34,12 +33,13 @@ public class TestFixedAvroSerializer { "\"name\":\"stormtest2\"," + "\"fields\":[{\"name\":\"foobar1\",\"type\":\"string\"}," + "{ \"name\":\"intint1\", \"type\":\"int\" }]}"; - private static final Schema schema1; - private static final Schema schema2; + private static Schema schema1; + private static Schema schema2; final AvroSchemaRegistry reg; - static { + @BeforeClass + public static void setupClass() { Schema.Parser parser = new Schema.Parser(); schema1 = parser.parse(schemaString1); @@ -58,7 +58,8 @@ public class TestFixedAvroSerializer { testTheSchema(schema2); } - @Test public void testDifferentFPs() { + @Test + public void testDifferentFPs() { String fp1 = reg.getFingerprint(schema1); String fp2 = reg.getFingerprint(schema2); http://git-wip-us.apache.org/repos/asf/storm/blob/a0308efd/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestGenericAvroSerializer.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestGenericAvroSerializer.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestGenericAvroSerializer.java index ddfdcf5..fb97782 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestGenericAvroSerializer.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestGenericAvroSerializer.java @@ -19,6 +19,7 @@ package org.apache.storm.hdfs.avro; import org.apache.avro.Schema; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; public class TestGenericAvroSerializer { @@ -30,12 +31,13 @@ public class TestGenericAvroSerializer { "\"name\":\"stormtest2\"," + "\"fields\":[{\"name\":\"foobar1\",\"type\":\"string\"}," + "{ \"name\":\"intint1\", \"type\":\"int\" }]}"; - private static final Schema schema1; - private static final Schema schema2; + private static Schema schema1; + private static Schema schema2; AvroSchemaRegistry reg = new GenericAvroSerializer(); - static { + @BeforeClass + public static void setupClass() { Schema.Parser parser = new Schema.Parser(); schema1 = parser.parse(schemaString1); @@ -50,7 +52,8 @@ public class TestGenericAvroSerializer { testTheSchema(schema2); } - @Test public void testDifferentFPs() { + @Test + public void testDifferentFPs() { String fp1 = reg.getFingerprint(schema1); String fp2 = reg.getFingerprint(schema2);
