http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java index 6d626be..af0127a 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java @@ -1,12 +1,7 @@ - /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> @@ -28,77 +23,60 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.storm.Config; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.storm.Config; import org.apache.storm.hdfs.common.HdfsUtils; import org.apache.storm.hdfs.security.HdfsSecurityUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class HdfsSpout extends BaseRichSpout { + // other members + private static final Logger LOG = LoggerFactory.getLogger(HdfsSpout.class); + private final AtomicBoolean commitTimeElapsed = new AtomicBoolean(false); + HashMap<MessageId, List<Object>> inflight = new HashMap<>(); + LinkedBlockingQueue<HdfsUtils.Pair<MessageId, List<Object>>> retryList = new LinkedBlockingQueue<>(); + HdfsUtils.Pair<Path, FileLock.LogEntry> lastExpiredLock = null; // user configurable private String hdfsUri; // required private String readerType; // required private Fields outputFields; // required - private String sourceDir; // required private Path sourceDirPath; // required - private String archiveDir; // required private Path archiveDirPath; // required - private String badFilesDir; // required private Path badFilesDirPath; // required - private String lockDir; private Path lockDirPath; - private int commitFrequencyCount = Configs.DEFAULT_COMMIT_FREQ_COUNT; private int commitFrequencySec = Configs.DEFAULT_COMMIT_FREQ_SEC; private int maxOutstanding = Configs.DEFAULT_MAX_OUTSTANDING; private int lockTimeoutSec = Configs.DEFAULT_LOCK_TIMEOUT; private boolean clocksInSync = true; - private String inprogress_suffix = ".inprogress"; // not configurable to prevent change between topology restarts private String ignoreSuffix = ".ignore"; - private String outputStreamName = null; - - // other members - private static final Logger LOG = LoggerFactory.getLogger(HdfsSpout.class); - private ProgressTracker tracker = null; - private FileSystem hdfs; private FileReader reader; - private SpoutOutputCollector collector; - HashMap<MessageId, List<Object>> inflight = new HashMap<>(); - LinkedBlockingQueue<HdfsUtils.Pair<MessageId, List<Object>>> retryList = new LinkedBlockingQueue<>(); - private Configuration hdfsConfig; - private Map<String, Object> conf = null; private FileLock lock; private String spoutId = null; - - HdfsUtils.Pair<Path, FileLock.LogEntry> lastExpiredLock = null; private long lastExpiredLockTime = 0; - private long tupleCounter = 0; private boolean ackEnabled = false; private int acksSinceLastCommit = 0; - private final AtomicBoolean commitTimeElapsed = new AtomicBoolean(false); private Timer commitTimer; private boolean fileReadCompletely = true; @@ -107,6 +85,59 @@ public class HdfsSpout extends BaseRichSpout { public HdfsSpout() { } + private static String getFileProgress(FileReader reader) { + return reader.getFilePath() + " " + reader.getFileOffset(); + } + + private static void releaseLockAndLog(FileLock fLock, String spoutId) { + try { + if (fLock != null) { + fLock.release(); + LOG.debug("Spout {} released FileLock. SpoutId = {}", fLock.getLockFile(), spoutId); + } + } catch (IOException e) { + LOG.error("Unable to delete lock file : " + fLock.getLockFile() + " SpoutId =" + spoutId, e); + } + } + + private static void validateOrMakeDir(FileSystem fs, Path dir, String dirDescription) { + try { + if (fs.exists(dir)) { + if (!fs.isDirectory(dir)) { + LOG.error(dirDescription + " directory is a file, not a dir. " + dir); + throw new RuntimeException(dirDescription + " directory is a file, not a dir. " + dir); + } + } else if (!fs.mkdirs(dir)) { + LOG.error("Unable to create " + dirDescription + " directory " + dir); + throw new RuntimeException("Unable to create " + dirDescription + " directory " + dir); + } + } catch (IOException e) { + LOG.error("Unable to create " + dirDescription + " directory " + dir, e); + throw new RuntimeException("Unable to create " + dirDescription + " directory " + dir, e); + } + } + + static void checkValidReader(String readerType) { + if (readerType.equalsIgnoreCase(Configs.TEXT) || readerType.equalsIgnoreCase(Configs.SEQ)) { + return; + } + try { + Class<?> classType = Class.forName(readerType); + classType.getConstructor(FileSystem.class, Path.class, Map.class); + if (!FileReader.class.isAssignableFrom(classType)) { + LOG.error(readerType + " not a FileReader"); + throw new IllegalArgumentException(readerType + " not a FileReader."); + } + return; + } catch (ClassNotFoundException e) { + LOG.error(readerType + " not found in classpath.", e); + throw new IllegalArgumentException(readerType + " not found in classpath.", e); + } catch (NoSuchMethodException e) { + LOG.error(readerType + " is missing the expected constructor for Readers.", e); + throw new IllegalArgumentException(readerType + " is missing the expected constuctor for Readers."); + } + } + public HdfsSpout setHdfsUri(String hdfsUri) { this.hdfsUri = hdfsUri; return this; @@ -211,8 +242,8 @@ public class HdfsSpout extends BaseRichSpout { if (ackEnabled && tracker.size() >= maxOutstanding) { LOG.warn("Waiting for more ACKs before generating new tuples. " - + "Progress tracker size has reached limit {}, SpoutID {}", - maxOutstanding, spoutId); + + "Progress tracker size has reached limit {}, SpoutID {}", + maxOutstanding, spoutId); // Don't emit anything .. allow configured spout wait strategy to kick in return; } @@ -260,7 +291,7 @@ public class HdfsSpout extends BaseRichSpout { return; } catch (ParseException e) { LOG.error("Parsing error when processing at file location " + getFileProgress(reader) - + ". Skipping remainder of file.", e); + + ". Skipping remainder of file.", e); markFileAsBad(reader.getFilePath()); // Note: We don't return from this method on ParseException to avoid triggering the // spout wait strategy (due to no emits). Instead we go back into the loop and @@ -301,10 +332,6 @@ public class HdfsSpout extends BaseRichSpout { commitTimer.schedule(timerTask, commitFrequencySec * 1000); } - private static String getFileProgress(FileReader reader) { - return reader.getFilePath() + " " + reader.getFileOffset(); - } - private void markFileAsDone(Path filePath) { try { Path newFile = renameCompletedFile(reader.getFilePath()); @@ -321,7 +348,8 @@ public class HdfsSpout extends BaseRichSpout { String originalName = new Path(fileNameMinusSuffix).getName(); Path newFile = new Path(badFilesDirPath + Path.SEPARATOR + originalName); - LOG.info("Moving bad file {} to {}. Processed it till offset {}. SpoutID= {}", originalName, newFile, tracker.getCommitPosition(), spoutId); + LOG.info("Moving bad file {} to {}. Processed it till offset {}. SpoutID= {}", originalName, newFile, tracker.getCommitPosition(), + spoutId); try { if (!hdfs.rename(file, newFile)) { // seems this can fail by returning false or throwing exception throw new IOException("Move failed for bad file: " + file); // convert false ret value to exception @@ -343,17 +371,6 @@ public class HdfsSpout extends BaseRichSpout { lock = null; } - private static void releaseLockAndLog(FileLock fLock, String spoutId) { - try { - if (fLock != null) { - fLock.release(); - LOG.debug("Spout {} released FileLock. SpoutId = {}", fLock.getLockFile(), spoutId); - } - } catch (IOException e) { - LOG.error("Unable to delete lock file : " + fLock.getLockFile() + " SpoutId =" + spoutId, e); - } - } - protected void emitData(List<Object> tuple, MessageId id) { LOG.trace("Emitting - {}", id); @@ -512,48 +529,10 @@ public class HdfsSpout extends BaseRichSpout { this.commitTimer.cancel(); } - private static void validateOrMakeDir(FileSystem fs, Path dir, String dirDescription) { - try { - if (fs.exists(dir)) { - if (!fs.isDirectory(dir)) { - LOG.error(dirDescription + " directory is a file, not a dir. " + dir); - throw new RuntimeException(dirDescription + " directory is a file, not a dir. " + dir); - } - } else if (!fs.mkdirs(dir)) { - LOG.error("Unable to create " + dirDescription + " directory " + dir); - throw new RuntimeException("Unable to create " + dirDescription + " directory " + dir); - } - } catch (IOException e) { - LOG.error("Unable to create " + dirDescription + " directory " + dir, e); - throw new RuntimeException("Unable to create " + dirDescription + " directory " + dir, e); - } - } - private String getDefaultLockDir(Path sourceDirPath) { return sourceDirPath.toString() + Path.SEPARATOR + Configs.DEFAULT_LOCK_DIR; } - static void checkValidReader(String readerType) { - if (readerType.equalsIgnoreCase(Configs.TEXT) || readerType.equalsIgnoreCase(Configs.SEQ)) { - return; - } - try { - Class<?> classType = Class.forName(readerType); - classType.getConstructor(FileSystem.class, Path.class, Map.class); - if (!FileReader.class.isAssignableFrom(classType)) { - LOG.error(readerType + " not a FileReader"); - throw new IllegalArgumentException(readerType + " not a FileReader."); - } - return; - } catch (ClassNotFoundException e) { - LOG.error(readerType + " not found in classpath.", e); - throw new IllegalArgumentException(readerType + " not found in classpath.", e); - } catch (NoSuchMethodException e) { - LOG.error(readerType + " is missing the expected constructor for Readers.", e); - throw new IllegalArgumentException(readerType + " is missing the expected constuctor for Readers."); - } - } - @Override public void ack(Object msgId) { LOG.trace("Ack received for msg {} on spout {}", msgId, spoutId);
http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ParseException.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ParseException.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ParseException.java index fdf7751f..a7845ea 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ParseException.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ParseException.java @@ -1,26 +1,20 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.hdfs.spout; public class ParseException extends Exception { - public ParseException(String message, Throwable cause) { - super(message, cause); - } + public ParseException(String message, Throwable cause) { + super(message, cause); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java index e2e7126..93a9b09 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.hdfs.spout; @@ -23,48 +17,48 @@ import java.util.TreeSet; public class ProgressTracker { - TreeSet<FileOffset> offsets = new TreeSet<>(); + TreeSet<FileOffset> offsets = new TreeSet<>(); - public synchronized void recordAckedOffset(FileOffset newOffset) { - if(newOffset==null) { - return; - } - offsets.add(newOffset); + public synchronized void recordAckedOffset(FileOffset newOffset) { + if (newOffset == null) { + return; + } + offsets.add(newOffset); - FileOffset currHead = offsets.first(); + FileOffset currHead = offsets.first(); - if( currHead.isNextOffset(newOffset) ) { // check is a minor optimization - trimHead(); + if (currHead.isNextOffset(newOffset)) { // check is a minor optimization + trimHead(); + } } - } - // remove contiguous elements from the head of the heap - // e.g.: 1,2,3,4,10,11,12,15 => 4,10,11,12,15 - private synchronized void trimHead() { - if(offsets.size()<=1) { - return; - } - FileOffset head = offsets.first(); - FileOffset head2 = offsets.higher(head); - if( head.isNextOffset(head2) ) { - offsets.pollFirst(); - trimHead(); + // remove contiguous elements from the head of the heap + // e.g.: 1,2,3,4,10,11,12,15 => 4,10,11,12,15 + private synchronized void trimHead() { + if (offsets.size() <= 1) { + return; + } + FileOffset head = offsets.first(); + FileOffset head2 = offsets.higher(head); + if (head.isNextOffset(head2)) { + offsets.pollFirst(); + trimHead(); + } + return; } - return; - } - public synchronized FileOffset getCommitPosition() { - if(!offsets.isEmpty()) { - return offsets.first().clone(); + public synchronized FileOffset getCommitPosition() { + if (!offsets.isEmpty()) { + return offsets.first().clone(); + } + return null; } - return null; - } - public synchronized void dumpState(PrintStream stream) { - stream.println(offsets); - } + public synchronized void dumpState(PrintStream stream) { + stream.println(offsets); + } - public synchronized int size() { - return offsets.size(); - } + public synchronized int size() { + return offsets.size(); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java index 64b6b7a..ab61c2b 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java @@ -1,23 +1,22 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.hdfs.spout; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; @@ -26,188 +25,185 @@ import org.apache.hadoop.util.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; +public class SequenceFileReader<Key extends Writable, Value extends Writable> + extends AbstractFileReader { + public static final String[] defaultFields = { "key", "value" }; + public static final String BUFFER_SIZE = "hdfsspout.reader.buffer.bytes"; + private static final Logger LOG = LoggerFactory + .getLogger(SequenceFileReader.class); + private static final int DEFAULT_BUFF_SIZE = 4096; + private final SequenceFile.Reader reader; -public class SequenceFileReader<Key extends Writable,Value extends Writable> - extends AbstractFileReader { - private static final Logger LOG = LoggerFactory - .getLogger(SequenceFileReader.class); - public static final String[] defaultFields = {"key", "value"}; - private static final int DEFAULT_BUFF_SIZE = 4096; - public static final String BUFFER_SIZE = "hdfsspout.reader.buffer.bytes"; - - private final SequenceFile.Reader reader; - - private final SequenceFileReader.Offset offset; - - - private final Key key; - private final Value value; - - - public SequenceFileReader(FileSystem fs, Path file, Map<String, Object> conf) - throws IOException { - super(fs, file); - int bufferSize = !conf.containsKey(BUFFER_SIZE) ? DEFAULT_BUFF_SIZE : Integer.parseInt( conf.get(BUFFER_SIZE).toString() ); - this.reader = new SequenceFile.Reader(fs.getConf(), SequenceFile.Reader.file(file), SequenceFile.Reader.bufferSize(bufferSize) ); - this.key = (Key) ReflectionUtils.newInstance(reader.getKeyClass(), fs.getConf() ); - this.value = (Value) ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf() ); - this.offset = new SequenceFileReader.Offset(0,0,0); - } - - public SequenceFileReader(FileSystem fs, Path file, Map<String, Object> conf, String offset) - throws IOException { - super(fs, file); - int bufferSize = !conf.containsKey(BUFFER_SIZE) ? DEFAULT_BUFF_SIZE : Integer.parseInt( conf.get(BUFFER_SIZE).toString() ); - this.offset = new SequenceFileReader.Offset(offset); - this.reader = new SequenceFile.Reader(fs.getConf(), SequenceFile.Reader.file(file), SequenceFile.Reader.bufferSize(bufferSize) ); - this.key = (Key) ReflectionUtils.newInstance(reader.getKeyClass(), fs.getConf() ); - this.value = (Value) ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf() ); - skipToOffset(this.reader, this.offset, this.key); - } - - private static <K> void skipToOffset(SequenceFile.Reader reader, Offset offset, K key) throws IOException { - reader.sync(offset.lastSyncPoint); - for(int i=0; i<offset.recordsSinceLastSync; ++i) { - reader.next(key); - } - } - - public List<Object> next() throws IOException, ParseException { - if( reader.next(key, value) ) { - ArrayList<Object> result = new ArrayList<Object>(2); - Collections.addAll(result, key, value); - offset.increment(reader.syncSeen(), reader.getPosition() ); - return result; - } - return null; - } - - @Override - public void close() { - try { - reader.close(); - } catch (IOException e) { - LOG.warn("Ignoring error when closing file " + getFilePath(), e); - } - } + private final SequenceFileReader.Offset offset; - public Offset getFileOffset() { - return offset; - } + private final Key key; + private final Value value; - public static class Offset implements FileOffset { - public long lastSyncPoint; - public long recordsSinceLastSync; - public long currentRecord; - private long currRecordEndOffset; - private long prevRecordEndOffset; - public Offset(long lastSyncPoint, long recordsSinceLastSync, long currentRecord) { - this(lastSyncPoint, recordsSinceLastSync, currentRecord, 0, 0 ); + public SequenceFileReader(FileSystem fs, Path file, Map<String, Object> conf) + throws IOException { + super(fs, file); + int bufferSize = !conf.containsKey(BUFFER_SIZE) ? DEFAULT_BUFF_SIZE : Integer.parseInt(conf.get(BUFFER_SIZE).toString()); + this.reader = new SequenceFile.Reader(fs.getConf(), SequenceFile.Reader.file(file), SequenceFile.Reader.bufferSize(bufferSize)); + this.key = (Key) ReflectionUtils.newInstance(reader.getKeyClass(), fs.getConf()); + this.value = (Value) ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf()); + this.offset = new SequenceFileReader.Offset(0, 0, 0); } - public Offset(long lastSyncPoint, long recordsSinceLastSync, long currentRecord - , long currRecordEndOffset, long prevRecordEndOffset) { - this.lastSyncPoint = lastSyncPoint; - this.recordsSinceLastSync = recordsSinceLastSync; - this.currentRecord = currentRecord; - this.prevRecordEndOffset = prevRecordEndOffset; - this.currRecordEndOffset = currRecordEndOffset; + public SequenceFileReader(FileSystem fs, Path file, Map<String, Object> conf, String offset) + throws IOException { + super(fs, file); + int bufferSize = !conf.containsKey(BUFFER_SIZE) ? DEFAULT_BUFF_SIZE : Integer.parseInt(conf.get(BUFFER_SIZE).toString()); + this.offset = new SequenceFileReader.Offset(offset); + this.reader = new SequenceFile.Reader(fs.getConf(), SequenceFile.Reader.file(file), SequenceFile.Reader.bufferSize(bufferSize)); + this.key = (Key) ReflectionUtils.newInstance(reader.getKeyClass(), fs.getConf()); + this.value = (Value) ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf()); + skipToOffset(this.reader, this.offset, this.key); } - public Offset(String offset) { - try { - if(offset==null) { - throw new IllegalArgumentException("offset cannot be null"); - } - if(offset.equalsIgnoreCase("0")) { - this.lastSyncPoint = 0; - this.recordsSinceLastSync = 0; - this.currentRecord = 0; - this.prevRecordEndOffset = 0; - this.currRecordEndOffset = 0; - } else { - String[] parts = offset.split(":"); - this.lastSyncPoint = Long.parseLong(parts[0].split("=")[1]); - this.recordsSinceLastSync = Long.parseLong(parts[1].split("=")[1]); - this.currentRecord = Long.parseLong(parts[2].split("=")[1]); - this.prevRecordEndOffset = 0; - this.currRecordEndOffset = 0; + private static <K> void skipToOffset(SequenceFile.Reader reader, Offset offset, K key) throws IOException { + reader.sync(offset.lastSyncPoint); + for (int i = 0; i < offset.recordsSinceLastSync; ++i) { + reader.next(key); } - } catch (Exception e) { - throw new IllegalArgumentException("'" + offset + - "' cannot be interpreted. It is not in expected format for SequenceFileReader." + - " Format e.g. {sync=123:afterSync=345:record=67}"); - } } - @Override - public String toString() { - return '{' + - "sync=" + lastSyncPoint + - ":afterSync=" + recordsSinceLastSync + - ":record=" + currentRecord + - ":}"; + public List<Object> next() throws IOException, ParseException { + if (reader.next(key, value)) { + ArrayList<Object> result = new ArrayList<Object>(2); + Collections.addAll(result, key, value); + offset.increment(reader.syncSeen(), reader.getPosition()); + return result; + } + return null; } @Override - public boolean isNextOffset(FileOffset rhs) { - if(rhs instanceof Offset) { - Offset other = ((Offset) rhs); - return other.currentRecord > currentRecord+1; - } - return false; + public void close() { + try { + reader.close(); + } catch (IOException e) { + LOG.warn("Ignoring error when closing file " + getFilePath(), e); + } } - @Override - public int compareTo(FileOffset o) { - Offset rhs = ((Offset) o); - if(currentRecord<rhs.currentRecord) { - return -1; - } - if(currentRecord==rhs.currentRecord) { - return 0; - } - return 1; + public Offset getFileOffset() { + return offset; } - @Override - public boolean equals(Object o) { - if (this == o) { return true; } - if (!(o instanceof Offset)) { return false; } - Offset offset = (Offset) o; + public static class Offset implements FileOffset { + public long lastSyncPoint; + public long recordsSinceLastSync; + public long currentRecord; + private long currRecordEndOffset; + private long prevRecordEndOffset; - return currentRecord == offset.currentRecord; - } + public Offset(long lastSyncPoint, long recordsSinceLastSync, long currentRecord) { + this(lastSyncPoint, recordsSinceLastSync, currentRecord, 0, 0); + } - @Override - public int hashCode() { - return (int) (currentRecord ^ (currentRecord >>> 32)); - } - - void increment(boolean syncSeen, long newBytePosition) { - if(!syncSeen) { - ++recordsSinceLastSync; - } else { - recordsSinceLastSync = 1; - lastSyncPoint = prevRecordEndOffset; - } - ++currentRecord; - prevRecordEndOffset = currRecordEndOffset; - currentRecord = newBytePosition; - } + public Offset(long lastSyncPoint, long recordsSinceLastSync, long currentRecord + , long currRecordEndOffset, long prevRecordEndOffset) { + this.lastSyncPoint = lastSyncPoint; + this.recordsSinceLastSync = recordsSinceLastSync; + this.currentRecord = currentRecord; + this.prevRecordEndOffset = prevRecordEndOffset; + this.currRecordEndOffset = currRecordEndOffset; + } - @Override - public Offset clone() { - return new Offset(lastSyncPoint, recordsSinceLastSync, currentRecord, currRecordEndOffset, prevRecordEndOffset); - } + public Offset(String offset) { + try { + if (offset == null) { + throw new IllegalArgumentException("offset cannot be null"); + } + if (offset.equalsIgnoreCase("0")) { + this.lastSyncPoint = 0; + this.recordsSinceLastSync = 0; + this.currentRecord = 0; + this.prevRecordEndOffset = 0; + this.currRecordEndOffset = 0; + } else { + String[] parts = offset.split(":"); + this.lastSyncPoint = Long.parseLong(parts[0].split("=")[1]); + this.recordsSinceLastSync = Long.parseLong(parts[1].split("=")[1]); + this.currentRecord = Long.parseLong(parts[2].split("=")[1]); + this.prevRecordEndOffset = 0; + this.currRecordEndOffset = 0; + } + } catch (Exception e) { + throw new IllegalArgumentException("'" + offset + + "' cannot be interpreted. It is not in expected format for SequenceFileReader." + + " Format e.g. {sync=123:afterSync=345:record=67}"); + } + } + + @Override + public String toString() { + return '{' + + "sync=" + lastSyncPoint + + ":afterSync=" + recordsSinceLastSync + + ":record=" + currentRecord + + ":}"; + } + + @Override + public boolean isNextOffset(FileOffset rhs) { + if (rhs instanceof Offset) { + Offset other = ((Offset) rhs); + return other.currentRecord > currentRecord + 1; + } + return false; + } + + @Override + public int compareTo(FileOffset o) { + Offset rhs = ((Offset) o); + if (currentRecord < rhs.currentRecord) { + return -1; + } + if (currentRecord == rhs.currentRecord) { + return 0; + } + return 1; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof Offset)) { + return false; + } + + Offset offset = (Offset) o; + + return currentRecord == offset.currentRecord; + } + + @Override + public int hashCode() { + return (int) (currentRecord ^ (currentRecord >>> 32)); + } + + void increment(boolean syncSeen, long newBytePosition) { + if (!syncSeen) { + ++recordsSinceLastSync; + } else { + recordsSinceLastSync = 1; + lastSyncPoint = prevRecordEndOffset; + } + ++currentRecord; + prevRecordEndOffset = currRecordEndOffset; + currentRecord = newBytePosition; + } + + @Override + public Offset clone() { + return new Offset(lastSyncPoint, recordsSinceLastSync, currentRecord, currRecordEndOffset, prevRecordEndOffset); + } - } //class Offset + } //class Offset } //class http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java index a393238..cc5531e 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java @@ -1,192 +1,190 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.hdfs.spout; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; // Todo: Track file offsets instead of line number public class TextFileReader extends AbstractFileReader { - public static final String[] defaultFields = {"line"}; - public static final String CHARSET = "hdfsspout.reader.charset"; - public static final String BUFFER_SIZE = "hdfsspout.reader.buffer.bytes"; - - private static final int DEFAULT_BUFF_SIZE = 4096; - - private BufferedReader reader; - private final Logger LOG = LoggerFactory.getLogger(TextFileReader.class); - private TextFileReader.Offset offset; - - public TextFileReader(FileSystem fs, Path file, Map<String, Object> conf) throws IOException { - this(fs, file, conf, new TextFileReader.Offset(0,0) ); - } - - public TextFileReader(FileSystem fs, Path file, Map<String, Object> conf, String startOffset) throws IOException { - this(fs, file, conf, new TextFileReader.Offset(startOffset) ); - } - - private TextFileReader(FileSystem fs, Path file, Map<String, Object> conf, TextFileReader.Offset startOffset) - throws IOException { - super(fs, file); - offset = startOffset; - FSDataInputStream in = fs.open(file); - - String charSet = (conf==null || !conf.containsKey(CHARSET) ) ? "UTF-8" : conf.get(CHARSET).toString(); - int buffSz = (conf==null || !conf.containsKey(BUFFER_SIZE) ) ? DEFAULT_BUFF_SIZE : Integer.parseInt( conf.get(BUFFER_SIZE).toString() ); - reader = new BufferedReader(new InputStreamReader(in, charSet), buffSz); - if(offset.charOffset >0) { - reader.skip(offset.charOffset); - } - - } + public static final String[] defaultFields = { "line" }; + public static final String CHARSET = "hdfsspout.reader.charset"; + public static final String BUFFER_SIZE = "hdfsspout.reader.buffer.bytes"; - public Offset getFileOffset() { - return offset.clone(); - } + private static final int DEFAULT_BUFF_SIZE = 4096; + private final Logger LOG = LoggerFactory.getLogger(TextFileReader.class); + private BufferedReader reader; + private TextFileReader.Offset offset; - public List<Object> next() throws IOException, ParseException { - String line = readLineAndTrackOffset(reader); - if(line!=null) { - return Collections.singletonList((Object) line); - } - return null; - } - - private String readLineAndTrackOffset(BufferedReader reader) throws IOException { - StringBuffer sb = new StringBuffer(1000); - long before = offset.charOffset; - int ch; - while( (ch = reader.read()) != -1 ) { - ++offset.charOffset; - if (ch == '\n') { - ++offset.lineNumber; - return sb.toString(); - } else if( ch != '\r') { - sb.append((char)ch); - } - } - if(before==offset.charOffset) { // reached EOF, didnt read anything - return null; + public TextFileReader(FileSystem fs, Path file, Map<String, Object> conf) throws IOException { + this(fs, file, conf, new TextFileReader.Offset(0, 0)); } - return sb.toString(); - } - - @Override - public void close() { - try { - reader.close(); - } catch (IOException e) { - LOG.warn("Ignoring error when closing file " + getFilePath(), e); + + public TextFileReader(FileSystem fs, Path file, Map<String, Object> conf, String startOffset) throws IOException { + this(fs, file, conf, new TextFileReader.Offset(startOffset)); } - } - public static class Offset implements FileOffset { - long charOffset; - long lineNumber; + private TextFileReader(FileSystem fs, Path file, Map<String, Object> conf, TextFileReader.Offset startOffset) + throws IOException { + super(fs, file); + offset = startOffset; + FSDataInputStream in = fs.open(file); + + String charSet = (conf == null || !conf.containsKey(CHARSET)) ? "UTF-8" : conf.get(CHARSET).toString(); + int buffSz = + (conf == null || !conf.containsKey(BUFFER_SIZE)) ? DEFAULT_BUFF_SIZE : Integer.parseInt(conf.get(BUFFER_SIZE).toString()); + reader = new BufferedReader(new InputStreamReader(in, charSet), buffSz); + if (offset.charOffset > 0) { + reader.skip(offset.charOffset); + } - public Offset(long byteOffset, long lineNumber) { - this.charOffset = byteOffset; - this.lineNumber = lineNumber; } - public Offset(String offset) { - if(offset==null) { - throw new IllegalArgumentException("offset cannot be null"); - } - try { - if(offset.equalsIgnoreCase("0")) { - this.charOffset = 0; - this.lineNumber = 0; - } else { - String[] parts = offset.split(":"); - this.charOffset = Long.parseLong(parts[0].split("=")[1]); - this.lineNumber = Long.parseLong(parts[1].split("=")[1]); - } - } catch (Exception e) { - throw new IllegalArgumentException("'" + offset + - "' cannot be interpreted. It is not in expected format for TextFileReader." + - " Format e.g. {char=123:line=5}"); - } + public Offset getFileOffset() { + return offset.clone(); } - @Override - public String toString() { - return '{' + - "char=" + charOffset + - ":line=" + lineNumber + - ":}"; + public List<Object> next() throws IOException, ParseException { + String line = readLineAndTrackOffset(reader); + if (line != null) { + return Collections.singletonList((Object) line); + } + return null; } - @Override - public boolean isNextOffset(FileOffset rhs) { - if(rhs instanceof Offset) { - Offset other = ((Offset) rhs); - return other.charOffset > charOffset && - other.lineNumber == lineNumber+1; - } - return false; + private String readLineAndTrackOffset(BufferedReader reader) throws IOException { + StringBuffer sb = new StringBuffer(1000); + long before = offset.charOffset; + int ch; + while ((ch = reader.read()) != -1) { + ++offset.charOffset; + if (ch == '\n') { + ++offset.lineNumber; + return sb.toString(); + } else if (ch != '\r') { + sb.append((char) ch); + } + } + if (before == offset.charOffset) { // reached EOF, didnt read anything + return null; + } + return sb.toString(); } @Override - public int compareTo(FileOffset o) { - Offset rhs = ((Offset)o); - if(lineNumber < rhs.lineNumber) { - return -1; - } - if(lineNumber == rhs.lineNumber) { - return 0; - } - return 1; + public void close() { + try { + reader.close(); + } catch (IOException e) { + LOG.warn("Ignoring error when closing file " + getFilePath(), e); + } } - @Override - public boolean equals(Object o) { - if (this == o) { return true; } - if (!(o instanceof Offset)) { return false; } + public static class Offset implements FileOffset { + long charOffset; + long lineNumber; - Offset that = (Offset) o; + public Offset(long byteOffset, long lineNumber) { + this.charOffset = byteOffset; + this.lineNumber = lineNumber; + } - if (charOffset != that.charOffset) - return false; - return lineNumber == that.lineNumber; - } + public Offset(String offset) { + if (offset == null) { + throw new IllegalArgumentException("offset cannot be null"); + } + try { + if (offset.equalsIgnoreCase("0")) { + this.charOffset = 0; + this.lineNumber = 0; + } else { + String[] parts = offset.split(":"); + this.charOffset = Long.parseLong(parts[0].split("=")[1]); + this.lineNumber = Long.parseLong(parts[1].split("=")[1]); + } + } catch (Exception e) { + throw new IllegalArgumentException("'" + offset + + "' cannot be interpreted. It is not in expected format for TextFileReader." + + " Format e.g. {char=123:line=5}"); + } + } - @Override - public int hashCode() { - int result = (int) (charOffset ^ (charOffset >>> 32)); - result = 31 * result + (int) (lineNumber ^ (lineNumber >>> 32)); - return result; - } + @Override + public String toString() { + return '{' + + "char=" + charOffset + + ":line=" + lineNumber + + ":}"; + } - @Override - public Offset clone() { - return new Offset(charOffset, lineNumber); - } - } //class Offset + @Override + public boolean isNextOffset(FileOffset rhs) { + if (rhs instanceof Offset) { + Offset other = ((Offset) rhs); + return other.charOffset > charOffset && + other.lineNumber == lineNumber + 1; + } + return false; + } + + @Override + public int compareTo(FileOffset o) { + Offset rhs = ((Offset) o); + if (lineNumber < rhs.lineNumber) { + return -1; + } + if (lineNumber == rhs.lineNumber) { + return 0; + } + return 1; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof Offset)) { + return false; + } + + Offset that = (Offset) o; + + if (charOffset != that.charOffset) { + return false; + } + return lineNumber == that.lineNumber; + } + + @Override + public int hashCode() { + int result = (int) (charOffset ^ (charOffset >>> 32)); + result = 31 * result + (int) (lineNumber ^ (lineNumber >>> 32)); + return result; + } + + @Override + public Offset clone() { + return new Offset(charOffset, lineNumber); + } + } //class Offset } http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java index 07968f2..e6adfbb 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java @@ -15,8 +15,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.hdfs.trident; +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.Serializable; +import java.net.URI; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -43,32 +55,154 @@ import org.apache.storm.trident.tuple.TridentTuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.io.Serializable; -import java.net.URI; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.List; -import java.util.Map; - public class HdfsState implements State { + public static final Logger LOG = LoggerFactory.getLogger(HdfsState.class); + private Options options; + private volatile TxnRecord lastSeenTxn; + private Path indexFilePath; + + + HdfsState(Options options) { + this.options = options; + } + + void prepare(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { + this.options.prepare(conf, partitionIndex, numPartitions); + initLastTxn(conf, partitionIndex); + } + + private TxnRecord readTxnRecord(Path path) throws IOException { + FSDataInputStream inputStream = null; + try { + inputStream = this.options.fs.open(path); + BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); + String line; + if ((line = reader.readLine()) != null) { + String[] fields = line.split(","); + return new TxnRecord(Long.valueOf(fields[0]), fields[1], Long.valueOf(fields[2])); + } + } finally { + if (inputStream != null) { + inputStream.close(); + } + } + return new TxnRecord(0, options.currentFile.toString(), 0); + } + + /** + * Returns temp file path corresponding to a file name. + */ + private Path tmpFilePath(String filename) { + return new Path(filename + ".tmp"); + } + + /** + * Reads the last txn record from index file if it exists, if not from .tmp file if exists. + * + * @param indexFilePath the index file path + * @return the txn record from the index file or a default initial record. + * + * @throws IOException + */ + private TxnRecord getTxnRecord(Path indexFilePath) throws IOException { + Path tmpPath = tmpFilePath(indexFilePath.toString()); + if (this.options.fs.exists(indexFilePath)) { + return readTxnRecord(indexFilePath); + } else if (this.options.fs.exists(tmpPath)) { + return readTxnRecord(tmpPath); + } + return new TxnRecord(0, options.currentFile.toString(), 0); + } + + private void initLastTxn(Map<String, Object> conf, int partition) { + // include partition id in the file name so that index for different partitions are independent. + String indexFileName = String.format(".index.%s.%d", conf.get(Config.TOPOLOGY_NAME), partition); + this.indexFilePath = new Path(options.fileNameFormat.getPath(), indexFileName); + try { + this.lastSeenTxn = getTxnRecord(indexFilePath); + LOG.debug("initLastTxn updated lastSeenTxn to [{}]", this.lastSeenTxn); + } catch (IOException e) { + LOG.warn("initLastTxn failed due to IOException.", e); + throw new RuntimeException(e); + } + } + + private void updateIndex(long txId) { + LOG.debug("Starting index update."); + final Path tmpPath = tmpFilePath(indexFilePath.toString()); + + try (FSDataOutputStream out = this.options.fs.create(tmpPath, true); + BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out))) { + TxnRecord txnRecord = new TxnRecord(txId, options.currentFile.toString(), this.options.getCurrentOffset()); + bw.write(txnRecord.toString()); + bw.newLine(); + bw.flush(); + out.close(); /* In non error scenarios, for the Azure Data Lake Store File System (adl://), + the output stream must be closed before the file associated with it is deleted. + For ADLFS deleting the file also removes any handles to the file, hence out.close() will fail. */ + /* + * Delete the current index file and rename the tmp file to atomically + * replace the index file. Orphan .tmp files are handled in getTxnRecord. + */ + options.fs.delete(this.indexFilePath, false); + options.fs.rename(tmpPath, this.indexFilePath); + lastSeenTxn = txnRecord; + LOG.debug("updateIndex updated lastSeenTxn to [{}]", this.lastSeenTxn); + } catch (IOException e) { + LOG.warn("Begin commit failed due to IOException. Failing batch", e); + throw new FailedException(e); + } + } + + @Override + public void beginCommit(Long txId) { + if (txId <= lastSeenTxn.txnid) { + LOG.info("txID {} is already processed, lastSeenTxn {}. Triggering recovery.", txId, lastSeenTxn); + long start = System.currentTimeMillis(); + options.recover(lastSeenTxn.dataFilePath, lastSeenTxn.offset); + LOG.info("Recovery took {} ms.", System.currentTimeMillis() - start); + } + updateIndex(txId); + } + + @Override + public void commit(Long txId) { + try { + options.doCommit(txId); + } catch (IOException e) { + LOG.warn("Commit failed due to IOException. Failing the batch.", e); + throw new FailedException(e); + } + } + + public void updateState(List<TridentTuple> tuples, TridentCollector tridentCollector) { + try { + this.options.execute(tuples); + } catch (IOException e) { + LOG.warn("Failing batch due to IOException.", e); + throw new FailedException(e); + } + } + + /** + * for unit tests + */ + void close() throws IOException { + this.options.closeOutputFile(); + } + public static abstract class Options implements Serializable { protected String fsUrl; protected String configKey; protected transient FileSystem fs; - private Path currentFile; protected FileRotationPolicy rotationPolicy; protected FileNameFormat fileNameFormat; protected int rotation = 0; protected transient Configuration hdfsConfig; protected ArrayList<RotationAction> rotationActions = new ArrayList<RotationAction>(); - + private Path currentFile; abstract void closeOutputFile() throws IOException; @@ -78,7 +212,7 @@ public class HdfsState implements State { abstract void doPrepare(Map<String, Object> conf, int partitionIndex, int numPartitions) throws IOException; - abstract long getCurrentOffset() throws IOException; + abstract long getCurrentOffset() throws IOException; abstract void doCommit(Long txId) throws IOException; @@ -143,8 +277,7 @@ public class HdfsState implements State { } /** - * Recovers nBytes from srcFile to the new file created - * by calling rotateOutputFile and then deletes the srcFile. + * Recovers nBytes from srcFile to the new file created by calling rotateOutputFile and then deletes the srcFile. */ private void recover(String srcFile, long nBytes) { try { @@ -169,10 +302,10 @@ public class HdfsState implements State { public static class HdfsFileOptions extends Options { - private transient FSDataOutputStream out; protected RecordFormat format; + private transient FSDataOutputStream out; private long offset = 0; - private int bufferSize = 131072; // default 128 K + private int bufferSize = 131072; // default 128 K public HdfsFileOptions withFsUrl(String fsUrl) { this.fsUrl = fsUrl; @@ -360,24 +493,25 @@ public class HdfsState implements State { @Override void doRecover(Path srcPath, long nBytes) throws Exception { SequenceFile.Reader reader = new SequenceFile.Reader(this.hdfsConfig, - SequenceFile.Reader.file(srcPath), SequenceFile.Reader.length(nBytes)); + SequenceFile.Reader.file(srcPath), SequenceFile.Reader.length(nBytes)); Writable key = (Writable) this.format.keyClass().newInstance(); Writable value = (Writable) this.format.valueClass().newInstance(); - while(reader.next(key, value)) { + while (reader.next(key, value)) { this.writer.append(key, value); } } @Override Path createOutputFile() throws IOException { - Path p = new Path(this.fsUrl + this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis())); + Path p = new Path(this.fsUrl + this.fileNameFormat.getPath(), + this.fileNameFormat.getName(this.rotation, System.currentTimeMillis())); this.writer = SequenceFile.createWriter( - this.hdfsConfig, - SequenceFile.Writer.file(p), - SequenceFile.Writer.keyClass(this.format.keyClass()), - SequenceFile.Writer.valueClass(this.format.valueClass()), - SequenceFile.Writer.compression(this.compressionType, this.codecFactory.getCodecByName(this.compressionCodec)) + this.hdfsConfig, + SequenceFile.Writer.file(p), + SequenceFile.Writer.keyClass(this.format.keyClass()), + SequenceFile.Writer.valueClass(this.format.valueClass()), + SequenceFile.Writer.compression(this.compressionType, this.codecFactory.getCodecByName(this.compressionCodec)) ); return p; } @@ -418,138 +552,4 @@ public class HdfsState implements State { return Long.toString(txnid) + "," + dataFilePath + "," + Long.toString(offset); } } - - - public static final Logger LOG = LoggerFactory.getLogger(HdfsState.class); - private Options options; - private volatile TxnRecord lastSeenTxn; - private Path indexFilePath; - - HdfsState(Options options) { - this.options = options; - } - - void prepare(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { - this.options.prepare(conf, partitionIndex, numPartitions); - initLastTxn(conf, partitionIndex); - } - - private TxnRecord readTxnRecord(Path path) throws IOException { - FSDataInputStream inputStream = null; - try { - inputStream = this.options.fs.open(path); - BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); - String line; - if ((line = reader.readLine()) != null) { - String[] fields = line.split(","); - return new TxnRecord(Long.valueOf(fields[0]), fields[1], Long.valueOf(fields[2])); - } - } finally { - if (inputStream != null) { - inputStream.close(); - } - } - return new TxnRecord(0, options.currentFile.toString(), 0); - } - - /** - * Returns temp file path corresponding to a file name. - */ - private Path tmpFilePath(String filename) { - return new Path(filename + ".tmp"); - } - /** - * Reads the last txn record from index file if it exists, if not - * from .tmp file if exists. - * - * @param indexFilePath the index file path - * @return the txn record from the index file or a default initial record. - * @throws IOException - */ - private TxnRecord getTxnRecord(Path indexFilePath) throws IOException { - Path tmpPath = tmpFilePath(indexFilePath.toString()); - if (this.options.fs.exists(indexFilePath)) { - return readTxnRecord(indexFilePath); - } else if (this.options.fs.exists(tmpPath)) { - return readTxnRecord(tmpPath); - } - return new TxnRecord(0, options.currentFile.toString(), 0); - } - - private void initLastTxn(Map<String, Object> conf, int partition) { - // include partition id in the file name so that index for different partitions are independent. - String indexFileName = String.format(".index.%s.%d", conf.get(Config.TOPOLOGY_NAME), partition); - this.indexFilePath = new Path(options.fileNameFormat.getPath(), indexFileName); - try { - this.lastSeenTxn = getTxnRecord(indexFilePath); - LOG.debug("initLastTxn updated lastSeenTxn to [{}]", this.lastSeenTxn); - } catch (IOException e) { - LOG.warn("initLastTxn failed due to IOException.", e); - throw new RuntimeException(e); - } - } - - private void updateIndex(long txId) { - LOG.debug("Starting index update."); - final Path tmpPath = tmpFilePath(indexFilePath.toString()); - - try (FSDataOutputStream out = this.options.fs.create(tmpPath, true); - BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out))) { - TxnRecord txnRecord = new TxnRecord(txId, options.currentFile.toString(), this.options.getCurrentOffset()); - bw.write(txnRecord.toString()); - bw.newLine(); - bw.flush(); - out.close(); /* In non error scenarios, for the Azure Data Lake Store File System (adl://), - the output stream must be closed before the file associated with it is deleted. - For ADLFS deleting the file also removes any handles to the file, hence out.close() will fail. */ - /* - * Delete the current index file and rename the tmp file to atomically - * replace the index file. Orphan .tmp files are handled in getTxnRecord. - */ - options.fs.delete(this.indexFilePath, false); - options.fs.rename(tmpPath, this.indexFilePath); - lastSeenTxn = txnRecord; - LOG.debug("updateIndex updated lastSeenTxn to [{}]", this.lastSeenTxn); - } catch (IOException e) { - LOG.warn("Begin commit failed due to IOException. Failing batch", e); - throw new FailedException(e); - } - } - - @Override - public void beginCommit(Long txId) { - if (txId <= lastSeenTxn.txnid) { - LOG.info("txID {} is already processed, lastSeenTxn {}. Triggering recovery.", txId, lastSeenTxn); - long start = System.currentTimeMillis(); - options.recover(lastSeenTxn.dataFilePath, lastSeenTxn.offset); - LOG.info("Recovery took {} ms.", System.currentTimeMillis() - start); - } - updateIndex(txId); - } - - @Override - public void commit(Long txId) { - try { - options.doCommit(txId); - } catch (IOException e) { - LOG.warn("Commit failed due to IOException. Failing the batch.", e); - throw new FailedException(e); - } - } - - public void updateState(List<TridentTuple> tuples, TridentCollector tridentCollector) { - try { - this.options.execute(tuples); - } catch (IOException e) { - LOG.warn("Failing batch due to IOException.", e); - throw new FailedException(e); - } - } - - /** - * for unit tests - */ - void close() throws IOException { - this.options.closeOutputFile(); - } } http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsStateFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsStateFactory.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsStateFactory.java index e76ec22..568f8bc 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsStateFactory.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsStateFactory.java @@ -15,23 +15,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.hdfs.trident; +import java.util.Map; import org.apache.storm.task.IMetricsContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.storm.trident.state.State; import org.apache.storm.trident.state.StateFactory; - -import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class HdfsStateFactory implements StateFactory { private static final Logger LOG = LoggerFactory.getLogger(HdfsStateFactory.class); private HdfsState.Options options; - public HdfsStateFactory(){} + public HdfsStateFactory() {} - public HdfsStateFactory withOptions(HdfsState.Options options){ + public HdfsStateFactory withOptions(HdfsState.Options options) { this.options = options; return this; } http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsUpdater.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsUpdater.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsUpdater.java index c603334..a63bb40 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsUpdater.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsUpdater.java @@ -15,15 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.hdfs.trident; +import java.util.List; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.state.BaseStateUpdater; import org.apache.storm.trident.tuple.TridentTuple; -import java.util.List; - -public class HdfsUpdater extends BaseStateUpdater<HdfsState>{ +public class HdfsUpdater extends BaseStateUpdater<HdfsState> { @Override public void updateState(HdfsState state, List<TridentTuple> tuples, TridentCollector collector) { state.updateState(tuples, collector); http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultFileNameFormat.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultFileNameFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultFileNameFormat.java index a952b36..825a0f0 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultFileNameFormat.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultFileNameFormat.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hdfs.trident.format; import java.util.Map; @@ -45,7 +40,7 @@ public class DefaultFileNameFormat implements FileNameFormat { * @param prefix * @return */ - public DefaultFileNameFormat withPrefix(String prefix){ + public DefaultFileNameFormat withPrefix(String prefix) { this.prefix = prefix; return this; } @@ -56,12 +51,12 @@ public class DefaultFileNameFormat implements FileNameFormat { * @param extension * @return */ - public DefaultFileNameFormat withExtension(String extension){ + public DefaultFileNameFormat withExtension(String extension) { this.extension = extension; return this; } - public DefaultFileNameFormat withPath(String path){ + public DefaultFileNameFormat withPath(String path) { this.path = path; return this; } @@ -74,10 +69,10 @@ public class DefaultFileNameFormat implements FileNameFormat { @Override public String getName(long rotation, long timeStamp) { - return this.prefix + "-" + this.partitionIndex + "-" + rotation + "-" + timeStamp + this.extension; + return this.prefix + "-" + this.partitionIndex + "-" + rotation + "-" + timeStamp + this.extension; } - public String getPath(){ + public String getPath() { return this.path; } } http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultSequenceFormat.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultSequenceFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultSequenceFormat.java index 1336144..f33c030 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultSequenceFormat.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultSequenceFormat.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.hdfs.trident.format; import org.apache.hadoop.io.LongWritable; @@ -25,7 +26,6 @@ import org.apache.storm.trident.tuple.TridentTuple; /** * Basic <code>SequenceFormat</code> implementation that uses * <code>LongWritable</code> for keys and <code>Text</code> for values. - * */ public class DefaultSequenceFormat implements SequenceFormat { private transient LongWritable key; @@ -34,13 +34,12 @@ public class DefaultSequenceFormat implements SequenceFormat { private String keyField; private String valueField; - public DefaultSequenceFormat(String keyField, String valueField){ + public DefaultSequenceFormat(String keyField, String valueField) { this.keyField = keyField; this.valueField = valueField; } - @Override public Class keyClass() { return LongWritable.class; @@ -53,8 +52,8 @@ public class DefaultSequenceFormat implements SequenceFormat { @Override public Writable key(TridentTuple tuple) { - if(this.key == null){ - this.key = new LongWritable(); + if (this.key == null) { + this.key = new LongWritable(); } this.key.set(tuple.getLongByField(this.keyField)); return this.key; @@ -62,7 +61,7 @@ public class DefaultSequenceFormat implements SequenceFormat { @Override public Writable value(TridentTuple tuple) { - if(this.value == null){ + if (this.value == null) { this.value = new Text(); } this.value.set(tuple.getStringByField(this.valueField)); http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DelimitedRecordFormat.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DelimitedRecordFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DelimitedRecordFormat.java index a08664d..e21fede 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DelimitedRecordFormat.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DelimitedRecordFormat.java @@ -1,24 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hdfs.trident.format; -import org.apache.storm.tuple.Fields; import org.apache.storm.trident.tuple.TridentTuple; +import org.apache.storm.tuple.Fields; /** * RecordFormat implementation that uses field and record delimiters. @@ -40,7 +35,7 @@ public class DelimitedRecordFormat implements RecordFormat { * @param fields * @return */ - public DelimitedRecordFormat withFields(Fields fields){ + public DelimitedRecordFormat withFields(Fields fields) { this.fields = fields; return this; } @@ -51,7 +46,7 @@ public class DelimitedRecordFormat implements RecordFormat { * @param delimiter * @return */ - public DelimitedRecordFormat withFieldDelimiter(String delimiter){ + public DelimitedRecordFormat withFieldDelimiter(String delimiter) { this.fieldDelimiter = delimiter; return this; } @@ -62,7 +57,7 @@ public class DelimitedRecordFormat implements RecordFormat { * @param delimiter * @return */ - public DelimitedRecordFormat withRecordDelimiter(String delimiter){ + public DelimitedRecordFormat withRecordDelimiter(String delimiter) { this.recordDelimiter = delimiter; return this; } @@ -71,9 +66,9 @@ public class DelimitedRecordFormat implements RecordFormat { public byte[] format(TridentTuple tuple) { StringBuilder sb = new StringBuilder(); int size = this.fields.size(); - for(int i = 0; i < size; i++){ + for (int i = 0; i < size; i++) { sb.append(tuple.getValueByField(fields.get(i))); - if(i != size - 1){ + if (i != size - 1) { sb.append(this.fieldDelimiter); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/FileNameFormat.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/FileNameFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/FileNameFormat.java index c5b0698..fbd8f5a 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/FileNameFormat.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/FileNameFormat.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hdfs.trident.format; import java.io.Serializable; http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/RecordFormat.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/RecordFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/RecordFormat.java index 76179d9..1cc5363 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/RecordFormat.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/RecordFormat.java @@ -1,26 +1,20 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.hdfs.trident.format; +package org.apache.storm.hdfs.trident.format; -import org.apache.storm.trident.tuple.TridentTuple; import java.io.Serializable; +import org.apache.storm.trident.tuple.TridentTuple; /** * Formats a Tuple object into a byte array http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SequenceFormat.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SequenceFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SequenceFormat.java index b4d6c5c..497d045 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SequenceFormat.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SequenceFormat.java @@ -15,16 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.hdfs.trident.format; +import java.io.Serializable; import org.apache.hadoop.io.Writable; import org.apache.storm.trident.tuple.TridentTuple; -import java.io.Serializable; - /** * Interface for converting <code>TridentTuple</code> objects to HDFS sequence file key-value pairs. - * */ public interface SequenceFormat extends Serializable { /** @@ -36,6 +35,7 @@ public interface SequenceFormat extends Serializable { /** * Value class used by implementation (e.g. Text.class, etc.) + * * @return */ Class valueClass(); @@ -50,6 +50,7 @@ public interface SequenceFormat extends Serializable { /** * Given a tuple, return the value that should be written to the sequence file. + * * @param tuple * @return */ http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SimpleFileNameFormat.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SimpleFileNameFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SimpleFileNameFormat.java index c676324..068390f 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SimpleFileNameFormat.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SimpleFileNameFormat.java @@ -1,27 +1,21 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hdfs.trident.format; import java.net.UnknownHostException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Map; - import org.apache.storm.utils.Utils; public class SimpleFileNameFormat implements FileNameFormat { @@ -39,10 +33,10 @@ public class SimpleFileNameFormat implements FileNameFormat { // compile parameters SimpleDateFormat dateFormat = new SimpleDateFormat(timeFormat); String ret = name - .replace("$TIME", dateFormat.format(new Date(timeStamp))) - .replace("$NUM", String.valueOf(rotation)) - .replace("$HOST", host) - .replace("$PARTITION", String.valueOf(partitionIndex)); + .replace("$TIME", dateFormat.format(new Date(timeStamp))) + .replace("$NUM", String.valueOf(rotation)) + .replace("$HOST", host) + .replace("$PARTITION", String.valueOf(partitionIndex)); return ret; } @@ -73,7 +67,7 @@ public class SimpleFileNameFormat implements FileNameFormat { * $NUM - rotation number<br/> * $HOST - local host name<br/> * $PARTITION - partition index<br/> - * + * * @param name * file name * @return @@ -85,10 +79,10 @@ public class SimpleFileNameFormat implements FileNameFormat { public SimpleFileNameFormat withTimeFormat(String timeFormat) { //check format - try{ + try { new SimpleDateFormat(timeFormat); - }catch (Exception e) { - throw new IllegalArgumentException("invalid timeFormat: "+e.getMessage()); + } catch (Exception e) { + throw new IllegalArgumentException("invalid timeFormat: " + e.getMessage()); } this.timeFormat = timeFormat; return this;
