http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java index 53bcc4c..d4a95ee 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.hdfs.common; @@ -25,11 +19,11 @@ import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; import org.apache.storm.tuple.Tuple; abstract public class AbstractHDFSWriter implements Writer { + final protected Path filePath; + final protected FileRotationPolicy rotationPolicy; protected long lastUsedTime; protected long offset; protected boolean needsRotation; - final protected Path filePath; - final protected FileRotationPolicy rotationPolicy; public AbstractHDFSWriter(FileRotationPolicy policy, Path path) { //This must be defensively copied, because a bolt probably has only one rotation policy object
http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericRecordHDFSWriter.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericRecordHDFSWriter.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericRecordHDFSWriter.java index 6e957c2..d77423c 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericRecordHDFSWriter.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericRecordHDFSWriter.java @@ -1,23 +1,20 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hdfs.common; +import java.io.IOException; +import java.util.EnumSet; import org.apache.avro.Schema; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumWriter; @@ -31,9 +28,6 @@ import org.apache.storm.tuple.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.EnumSet; - public class AvroGenericRecordHDFSWriter extends AbstractHDFSWriter { private static final Logger LOG = LoggerFactory.getLogger(AvroGenericRecordHDFSWriter.class); http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HDFSWriter.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HDFSWriter.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HDFSWriter.java index d69d770..578bc06 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HDFSWriter.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HDFSWriter.java @@ -1,22 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hdfs.common; +import java.io.IOException; +import java.util.EnumSet; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; @@ -26,10 +23,7 @@ import org.apache.storm.tuple.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.EnumSet; - -public class HDFSWriter extends AbstractHDFSWriter{ +public class HDFSWriter extends AbstractHDFSWriter { private static final Logger LOG = LoggerFactory.getLogger(HDFSWriter.class); http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java index 5ec5333..462087e 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java @@ -1,100 +1,93 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.hdfs.common; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.ipc.RemoteException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; - public class HdfsUtils { - /** list files sorted by modification time that have not been modified since 'olderThan'. if - * 'olderThan' is <= 0 then the filtering is disabled */ - public static ArrayList<Path> listFilesByModificationTime(FileSystem fs, Path directory, long olderThan) - throws IOException { - ArrayList<LocatedFileStatus> fstats = new ArrayList<>(); + /** list files sorted by modification time that have not been modified since 'olderThan'. if + * 'olderThan' is <= 0 then the filtering is disabled */ + public static ArrayList<Path> listFilesByModificationTime(FileSystem fs, Path directory, long olderThan) + throws IOException { + ArrayList<LocatedFileStatus> fstats = new ArrayList<>(); - RemoteIterator<LocatedFileStatus> itr = fs.listFiles(directory, false); - while( itr.hasNext() ) { - LocatedFileStatus fileStatus = itr.next(); - if(olderThan>0) { - if( fileStatus.getModificationTime()<=olderThan ) - fstats.add(fileStatus); - } - else { - fstats.add(fileStatus); - } - } - Collections.sort(fstats, new ModifTimeComparator() ); + RemoteIterator<LocatedFileStatus> itr = fs.listFiles(directory, false); + while (itr.hasNext()) { + LocatedFileStatus fileStatus = itr.next(); + if (olderThan > 0) { + if (fileStatus.getModificationTime() <= olderThan) { + fstats.add(fileStatus); + } + } else { + fstats.add(fileStatus); + } + } + Collections.sort(fstats, new ModifTimeComparator()); - ArrayList<Path> result = new ArrayList<>(fstats.size()); - for (LocatedFileStatus fstat : fstats) { - result.add(fstat.getPath()); + ArrayList<Path> result = new ArrayList<>(fstats.size()); + for (LocatedFileStatus fstat : fstats) { + result.add(fstat.getPath()); + } + return result; } - return result; - } - /** - * Returns null if file already exists. throws if there was unexpected problem - */ - public static FSDataOutputStream tryCreateFile(FileSystem fs, Path file) throws IOException { - try { - FSDataOutputStream os = fs.create(file, false); - return os; - } catch (FileAlreadyExistsException e) { - return null; - } catch (RemoteException e) { - if( e.unwrapRemoteException() instanceof AlreadyBeingCreatedException ) { - return null; - } else { // unexpected error - throw e; - } + /** + * Returns null if file already exists. throws if there was unexpected problem + */ + public static FSDataOutputStream tryCreateFile(FileSystem fs, Path file) throws IOException { + try { + FSDataOutputStream os = fs.create(file, false); + return os; + } catch (FileAlreadyExistsException e) { + return null; + } catch (RemoteException e) { + if (e.unwrapRemoteException() instanceof AlreadyBeingCreatedException) { + return null; + } else { // unexpected error + throw e; + } + } } - } - public static class Pair<K,V> { - private K key; - private V value; - public Pair(K key, V value) { - this.key = key; - this.value = value; - } + public static class Pair<K, V> { + private K key; + private V value; - public K getKey() { - return key; - } + public Pair(K key, V value) { + this.key = key; + this.value = value; + } - public V getValue() { - return value; - } + public static <K, V> Pair of(K key, V value) { + return new Pair(key, value); + } - public static <K,V> Pair of(K key, V value) { - return new Pair(key,value); - } - } // class Pair + public K getKey() { + return key; + } + + public V getValue() { + return value; + } + } // class Pair } http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/ModifTimeComparator.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/ModifTimeComparator.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/ModifTimeComparator.java index 0558b3f..47ebdfe 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/ModifTimeComparator.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/ModifTimeComparator.java @@ -1,32 +1,25 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.hdfs.common; -import org.apache.hadoop.fs.FileStatus; - import java.util.Comparator; +import org.apache.hadoop.fs.FileStatus; public class ModifTimeComparator - implements Comparator<FileStatus> { - @Override + implements Comparator<FileStatus> { + @Override public int compare(FileStatus o1, FileStatus o2) { - return new Long(o1.getModificationTime()).compareTo( o2.getModificationTime() ); + return new Long(o1.getModificationTime()).compareTo(o2.getModificationTime()); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/NullPartitioner.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/NullPartitioner.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/NullPartitioner.java index fd50496..3137f48 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/NullPartitioner.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/NullPartitioner.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hdfs.common; import org.apache.storm.tuple.Tuple; http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/Partitioner.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/Partitioner.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/Partitioner.java index 6cf0fbd..9f79373 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/Partitioner.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/Partitioner.java @@ -1,27 +1,21 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.hdfs.common; -import org.apache.storm.tuple.Tuple; +package org.apache.storm.hdfs.common; import java.io.Serializable; +import org.apache.storm.tuple.Tuple; -public interface Partitioner extends Serializable{ +public interface Partitioner extends Serializable { /** * Return a relative path that the tuple should be written to. For example, if an HdfsBolt were configured to write http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/SequenceFileWriter.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/SequenceFileWriter.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/SequenceFileWriter.java index ec78fd6..d0507b8 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/SequenceFileWriter.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/SequenceFileWriter.java @@ -1,22 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hdfs.common; +import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; import org.apache.storm.hdfs.bolt.format.SequenceFormat; @@ -25,9 +21,7 @@ import org.apache.storm.tuple.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; - -public class SequenceFileWriter extends AbstractHDFSWriter{ +public class SequenceFileWriter extends AbstractHDFSWriter { private static final Logger LOG = LoggerFactory.getLogger(SequenceFileWriter.class); http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/rotation/MoveFileAction.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/rotation/MoveFileAction.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/rotation/MoveFileAction.java index 585307d..0e8e1dd 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/rotation/MoveFileAction.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/rotation/MoveFileAction.java @@ -15,21 +15,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.hdfs.common.rotation; +import java.io.IOException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; - public class MoveFileAction implements RotationAction { private static final Logger LOG = LoggerFactory.getLogger(MoveFileAction.class); private String destination; - public MoveFileAction toDestination(String destDir){ + public MoveFileAction toDestination(String destDir) { destination = destDir; return this; } http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/rotation/RotationAction.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/rotation/RotationAction.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/rotation/RotationAction.java index b15c314..f5ade03 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/rotation/RotationAction.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/rotation/RotationAction.java @@ -15,14 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.storm.hdfs.common.rotation; +package org.apache.storm.hdfs.common.rotation; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import java.io.IOException; import java.io.Serializable; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; public interface RotationAction extends Serializable { void execute(FileSystem fileSystem, Path filePath) throws IOException; http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java index e1339df..6d5537b 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.hdfs.spout; @@ -24,37 +18,41 @@ import org.apache.hadoop.fs.Path; abstract class AbstractFileReader implements FileReader { - private final Path file; + private final Path file; - public AbstractFileReader(FileSystem fs, Path file) { - if (fs == null ) { - throw new IllegalArgumentException("filesystem arg cannot be null for reader"); + public AbstractFileReader(FileSystem fs, Path file) { + if (fs == null) { + throw new IllegalArgumentException("filesystem arg cannot be null for reader"); + } + if (file == null) { + throw new IllegalArgumentException("file arg cannot be null for reader"); + } + this.file = file; } - if (file == null ) { - throw new IllegalArgumentException("file arg cannot be null for reader"); - } - this.file = file; - } - @Override - public Path getFilePath() { - return file; - } + @Override + public Path getFilePath() { + return file; + } - @Override - public boolean equals(Object o) { - if (this == o) { return true; } - if (o == null || getClass() != o.getClass()) { return false; } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } - AbstractFileReader that = (AbstractFileReader) o; + AbstractFileReader that = (AbstractFileReader) o; - return !(file != null ? !file.equals(that.file) : that.file != null); - } + return !(file != null ? !file.equals(that.file) : that.file != null); + } - @Override - public int hashCode() { - return file != null ? file.hashCode() : 0; - } + @Override + public int hashCode() { + return file != null ? file.hashCode() : 0; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java index cb2607a..f94b8e5 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.hdfs.spout; @@ -29,13 +23,6 @@ import org.apache.storm.validation.NotConf; import org.apache.storm.validation.Validated; public class Configs implements Validated { - public static class ReaderTypeValidator extends Validator { - @Override - public void validateField(String name, Object o) { - HdfsSpout.checkValidReader((String)o); - } - } - /** * @deprecated please use {@link HdfsSpout.setReaderType(String)} */ @@ -45,7 +32,6 @@ public class Configs implements Validated { public static final String READER_TYPE = "hdfsspout.reader.type"; // Required - chose the file type being consumed public static final String TEXT = "text"; public static final String SEQ = "seq"; - /** * @deprecated please use {@link HdfsSpout#setHdfsUri(String)} */ @@ -81,7 +67,7 @@ public class Configs implements Validated { */ @Deprecated @isInteger - @isPositiveNumber(includeZero=true) + @isPositiveNumber(includeZero = true) public static final String COMMIT_FREQ_COUNT = "hdfsspout.commit.count"; // commit after N records. 0 disables this. /** * @deprecated please use {@link HdfsSpout#setCommitFrequencySec(int)} @@ -95,7 +81,7 @@ public class Configs implements Validated { */ @Deprecated @isInteger - @isPositiveNumber(includeZero=true) + @isPositiveNumber(includeZero = true) public static final String MAX_OUTSTANDING = "hdfsspout.max.outstanding"; /** * @deprecated please use {@link HdfsSpout#setLockTimeoutSec(int)} @@ -103,27 +89,34 @@ public class Configs implements Validated { @Deprecated @isInteger @isPositiveNumber - public static final String LOCK_TIMEOUT = "hdfsspout.lock.timeout.sec"; // inactivity duration after which locks are considered candidates for being reassigned to another spout + public static final String LOCK_TIMEOUT = "hdfsspout.lock.timeout.sec"; /** * @deprecated please use {@link HdfsSpout#setClocksInSync(boolean)} */ @Deprecated @isBoolean public static final String CLOCKS_INSYNC = "hdfsspout.clocks.insync"; // if clocks on machines in the Storm cluster are in sync + // inactivity duration after which locks are considered candidates for being reassigned to another spout /** * @deprecated please use {@link HdfsSpout#setIgnoreSuffix(String)} */ @Deprecated @isString - public static final String IGNORE_SUFFIX = "hdfsspout.ignore.suffix"; // filenames with this suffix in archive dir will be ignored by the Spout - + public static final String IGNORE_SUFFIX = "hdfsspout.ignore.suffix"; @NotConf public static final String DEFAULT_LOCK_DIR = ".lock"; + // filenames with this suffix in archive dir will be ignored by the Spout public static final int DEFAULT_COMMIT_FREQ_COUNT = 20000; public static final int DEFAULT_COMMIT_FREQ_SEC = 10; public static final int DEFAULT_MAX_OUTSTANDING = 10000; public static final int DEFAULT_LOCK_TIMEOUT = 5 * 60; // 5 min - @isMapEntryType(keyType = String.class, valueType = String.class) public static final String DEFAULT_HDFS_CONFIG_KEY = "hdfs.config"; + + public static class ReaderTypeValidator extends Validator { + @Override + public void validateField(String name, Object o) { + HdfsSpout.checkValidReader((String) o); + } + } } // class Configs http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java index 25a136c..eea23e1 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java @@ -1,23 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.hdfs.spout; +import java.io.IOException; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -26,108 +21,107 @@ import org.apache.storm.hdfs.common.HdfsUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; - /** * Facility to synchronize access to HDFS directory. The lock itself is represented * as a file in the same directory. Relies on atomic file creation. */ public class DirLock { - private FileSystem fs; - private final Path lockFile; - public static final String DIR_LOCK_FILE = "DIRLOCK"; - private static final Logger LOG = LoggerFactory.getLogger(DirLock.class); - private DirLock(FileSystem fs, Path lockFile) throws IOException { - if( fs.isDirectory(lockFile) ) { - throw new IllegalArgumentException(lockFile.toString() + " is not a directory"); - } - this.fs = fs; - this.lockFile = lockFile; - } - - /** Get a lock on file if not already locked - * - * @param fs - * @param dir the dir on which to get a lock - * @return The lock object if it the lock was acquired. Returns null if the dir is already locked. - * @throws IOException if there were errors - */ - public static DirLock tryLock(FileSystem fs, Path dir) throws IOException { - Path lockFile = getDirLockFile(dir); + public static final String DIR_LOCK_FILE = "DIRLOCK"; + private static final Logger LOG = LoggerFactory.getLogger(DirLock.class); + private final Path lockFile; + private FileSystem fs; - try { - FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, lockFile); - if (ostream!=null) { - LOG.debug("Thread ({}) Acquired lock on dir {}", threadInfo(), dir); - ostream.close(); - return new DirLock(fs, lockFile); - } else { - LOG.debug("Thread ({}) cannot lock dir {} as its already locked.", threadInfo(), dir); - return null; - } - } catch (IOException e) { - LOG.error("Error when acquiring lock on dir " + dir, e); - throw e; + private DirLock(FileSystem fs, Path lockFile) throws IOException { + if (fs.isDirectory(lockFile)) { + throw new IllegalArgumentException(lockFile.toString() + " is not a directory"); + } + this.fs = fs; + this.lockFile = lockFile; } - } - private static Path getDirLockFile(Path dir) { - return new Path(dir.toString() + Path.SEPARATOR_CHAR + DIR_LOCK_FILE ); - } + /** Get a lock on file if not already locked + * + * @param fs + * @param dir the dir on which to get a lock + * @return The lock object if it the lock was acquired. Returns null if the dir is already locked. + * @throws IOException if there were errors + */ + public static DirLock tryLock(FileSystem fs, Path dir) throws IOException { + Path lockFile = getDirLockFile(dir); - private static String threadInfo () { - return "ThdId=" + Thread.currentThread().getId() + ", ThdName=" - + Thread.currentThread().getName(); - } + try { + FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, lockFile); + if (ostream != null) { + LOG.debug("Thread ({}) Acquired lock on dir {}", threadInfo(), dir); + ostream.close(); + return new DirLock(fs, lockFile); + } else { + LOG.debug("Thread ({}) cannot lock dir {} as its already locked.", threadInfo(), dir); + return null; + } + } catch (IOException e) { + LOG.error("Error when acquiring lock on dir " + dir, e); + throw e; + } + } - /** Release lock on dir by deleting the lock file */ - public void release() throws IOException { - if(!fs.delete(lockFile, false)) { - LOG.error("Thread {} could not delete dir lock {} ", threadInfo(), lockFile); + private static Path getDirLockFile(Path dir) { + return new Path(dir.toString() + Path.SEPARATOR_CHAR + DIR_LOCK_FILE); } - else { - LOG.debug("Thread {} Released dir lock {} ", threadInfo(), lockFile); + + private static String threadInfo() { + return "ThdId=" + Thread.currentThread().getId() + ", ThdName=" + + Thread.currentThread().getName(); } - } - /** if the lock on the directory is stale, take ownership */ - public static DirLock takeOwnershipIfStale(FileSystem fs, Path dirToLock, int lockTimeoutSec) { - Path dirLockFile = getDirLockFile(dirToLock); + /** if the lock on the directory is stale, take ownership */ + public static DirLock takeOwnershipIfStale(FileSystem fs, Path dirToLock, int lockTimeoutSec) { + Path dirLockFile = getDirLockFile(dirToLock); - long now = System.currentTimeMillis(); - long expiryTime = now - (lockTimeoutSec*1000); + long now = System.currentTimeMillis(); + long expiryTime = now - (lockTimeoutSec * 1000); - try { - long modTime = fs.getFileStatus(dirLockFile).getModificationTime(); - if(modTime <= expiryTime) { - return takeOwnership(fs, dirLockFile); - } - return null; - } catch (IOException e) { - return null; + try { + long modTime = fs.getFileStatus(dirLockFile).getModificationTime(); + if (modTime <= expiryTime) { + return takeOwnership(fs, dirLockFile); + } + return null; + } catch (IOException e) { + return null; + } } - } - private static DirLock takeOwnership(FileSystem fs, Path dirLockFile) throws IOException { - if(fs instanceof DistributedFileSystem) { - if (!((DistributedFileSystem) fs).recoverLease(dirLockFile)) { - LOG.warn("Unable to recover lease on dir lock file " + dirLockFile + " right now. Cannot transfer ownership. Will need to try later."); + private static DirLock takeOwnership(FileSystem fs, Path dirLockFile) throws IOException { + if (fs instanceof DistributedFileSystem) { + if (!((DistributedFileSystem) fs).recoverLease(dirLockFile)) { + LOG.warn("Unable to recover lease on dir lock file " + dirLockFile + + " right now. Cannot transfer ownership. Will need to try later."); + return null; + } + } + + // delete and recreate lock file + if (fs.delete(dirLockFile, false)) { // returns false if somebody else already deleted it (to take ownership) + FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, dirLockFile); + if (ostream != null) { + ostream.close(); + } + return new DirLock(fs, dirLockFile); + } return null; - } } - // delete and recreate lock file - if( fs.delete(dirLockFile, false) ) { // returns false if somebody else already deleted it (to take ownership) - FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, dirLockFile); - if(ostream!=null) { - ostream.close(); - } - return new DirLock(fs, dirLockFile); + /** Release lock on dir by deleting the lock file */ + public void release() throws IOException { + if (!fs.delete(lockFile, false)) { + LOG.error("Thread {} could not delete dir lock {} ", threadInfo(), lockFile); + } else { + LOG.debug("Thread {} Released dir lock {} ", threadInfo(), lockFile); + } } - return null; - } - public Path getLockFile() { - return lockFile; - } + public Path getLockFile() { + return lockFile; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java index a7cb2b8..c5a2f55 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java @@ -1,24 +1,22 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.hdfs.spout; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Collection; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -30,11 +28,6 @@ import org.apache.storm.hdfs.common.HdfsUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.Collection; - /** * Facility to synchronize access to HDFS files. Thread gains exclusive access to a file by acquiring * a FileLock object. The lock itself is represented as file on HDFS. Relies on atomic file creation. @@ -43,291 +36,303 @@ import java.util.Collection; */ public class FileLock { - private final FileSystem fs; - private final String componentID; - private final Path lockFile; - private final FSDataOutputStream lockFileStream; - private LogEntry lastEntry; - - private static final Logger LOG = LoggerFactory.getLogger(FileLock.class); - - private FileLock(FileSystem fs, Path lockFile, FSDataOutputStream lockFileStream, String spoutId) - throws IOException { - this.fs = fs; - this.lockFile = lockFile; - this.lockFileStream = lockFileStream; - this.componentID = spoutId; - logProgress("0", false); - } - - private FileLock(FileSystem fs, Path lockFile, String spoutId, LogEntry entry) - throws IOException { - this.fs = fs; - this.lockFile = lockFile; - this.lockFileStream = fs.append(lockFile); - this.componentID = spoutId; - LOG.info("Acquired abandoned lockFile {}, Spout {}", lockFile, spoutId); - logProgress(entry.fileOffset, true); - } - - public void heartbeat(String fileOffset) throws IOException { - logProgress(fileOffset, true); - } - - // new line is at beginning of each line (instead of end) for better recovery from - // partial writes of prior lines - private void logProgress(String fileOffset, boolean prefixNewLine) - throws IOException { - long now = System.currentTimeMillis(); - LogEntry entry = new LogEntry(now, componentID, fileOffset); - String line = entry.toString(); - if(prefixNewLine) { - lockFileStream.writeBytes(System.lineSeparator() + line); + private static final Logger LOG = LoggerFactory.getLogger(FileLock.class); + private final FileSystem fs; + private final String componentID; + private final Path lockFile; + private final FSDataOutputStream lockFileStream; + private LogEntry lastEntry; + + private FileLock(FileSystem fs, Path lockFile, FSDataOutputStream lockFileStream, String spoutId) + throws IOException { + this.fs = fs; + this.lockFile = lockFile; + this.lockFileStream = lockFileStream; + this.componentID = spoutId; + logProgress("0", false); } - else { - lockFileStream.writeBytes(line); + + private FileLock(FileSystem fs, Path lockFile, String spoutId, LogEntry entry) + throws IOException { + this.fs = fs; + this.lockFile = lockFile; + this.lockFileStream = fs.append(lockFile); + this.componentID = spoutId; + LOG.info("Acquired abandoned lockFile {}, Spout {}", lockFile, spoutId); + logProgress(entry.fileOffset, true); } - lockFileStream.hflush(); - - lastEntry = entry; // update this only after writing to hdfs - } - - /** Release lock by deleting file - * @throws IOException if lock file could not be deleted - */ - public void release() throws IOException { - lockFileStream.close(); - if(!fs.delete(lockFile, false)) { - LOG.warn("Unable to delete lock file, Spout = {}", componentID); - throw new IOException("Unable to delete lock file"); + + /** returns lock on file or null if file is already locked. throws if unexpected problem */ + public static FileLock tryLock(FileSystem fs, Path fileToLock, Path lockDirPath, String spoutId) + throws IOException { + Path lockFile = new Path(lockDirPath, fileToLock.getName()); + + try { + FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, lockFile); + if (ostream != null) { + LOG.debug("Acquired lock on file {}. LockFile= {}, Spout = {}", fileToLock, lockFile, spoutId); + return new FileLock(fs, lockFile, ostream, spoutId); + } else { + LOG.debug("Cannot lock file {} as its already locked. Spout = {}", fileToLock, spoutId); + return null; + } + } catch (IOException e) { + LOG.error("Error when acquiring lock on file " + fileToLock + " Spout = " + spoutId, e); + throw e; + } } - LOG.debug("Released lock file {}. Spout {}", lockFile, componentID); - } - - // For testing only.. invoked via reflection - private void forceCloseLockFile() throws IOException { - lockFileStream.close(); - } - - /** returns lock on file or null if file is already locked. throws if unexpected problem */ - public static FileLock tryLock(FileSystem fs, Path fileToLock, Path lockDirPath, String spoutId) - throws IOException { - Path lockFile = new Path(lockDirPath, fileToLock.getName()); - - try { - FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, lockFile); - if (ostream != null) { - LOG.debug("Acquired lock on file {}. LockFile= {}, Spout = {}", fileToLock, lockFile, spoutId); - return new FileLock(fs, lockFile, ostream, spoutId); - } else { - LOG.debug("Cannot lock file {} as its already locked. Spout = {}", fileToLock, spoutId); + + /** + * checks if lockFile is older than 'olderThan' UTC time by examining the modification time + * on file and (if necessary) the timestamp in last log entry in the file. If its stale, then + * returns the last log entry, else returns null. + * @param fs + * @param lockFile + * @param olderThan time (millis) in UTC. + * @return the last entry in the file if its too old. null if last entry is not too old + * @throws IOException + */ + public static LogEntry getLastEntryIfStale(FileSystem fs, Path lockFile, long olderThan) + throws IOException { + long modifiedTime = fs.getFileStatus(lockFile).getModificationTime(); + if (modifiedTime <= olderThan) { // look + //Impt: HDFS timestamp may not reflect recent appends, so we double check the + // timestamp in last line of file to see when the last update was made + LogEntry lastEntry = getLastEntry(fs, lockFile); + if (lastEntry == null) { + LOG.warn("Empty lock file found. Deleting it. {}", lockFile); + try { + if (!fs.delete(lockFile, false)) { + throw new IOException("Empty lock file deletion failed"); + } + } catch (Exception e) { + LOG.error("Unable to delete empty lock file " + lockFile, e); + } + } + if (lastEntry.eventTime <= olderThan) { + return lastEntry; + } + } return null; - } - } catch (IOException e) { - LOG.error("Error when acquiring lock on file " + fileToLock + " Spout = " + spoutId, e); - throw e; } - } - - /** - * checks if lockFile is older than 'olderThan' UTC time by examining the modification time - * on file and (if necessary) the timestamp in last log entry in the file. If its stale, then - * returns the last log entry, else returns null. - * @param fs - * @param lockFile - * @param olderThan time (millis) in UTC. - * @return the last entry in the file if its too old. null if last entry is not too old - * @throws IOException - */ - public static LogEntry getLastEntryIfStale(FileSystem fs, Path lockFile, long olderThan) - throws IOException { - long modifiedTime = fs.getFileStatus(lockFile).getModificationTime(); - if( modifiedTime <= olderThan ) { // look - //Impt: HDFS timestamp may not reflect recent appends, so we double check the - // timestamp in last line of file to see when the last update was made - LogEntry lastEntry = getLastEntry(fs, lockFile); - if(lastEntry==null) { - LOG.warn("Empty lock file found. Deleting it. {}", lockFile); - try { - if(!fs.delete(lockFile, false)) - throw new IOException("Empty lock file deletion failed"); - } catch (Exception e) { - LOG.error("Unable to delete empty lock file " + lockFile, e); + + /** + * returns the last log entry + * @param fs + * @param lockFile + * @return + * @throws IOException + */ + public static LogEntry getLastEntry(FileSystem fs, Path lockFile) + throws IOException { + FSDataInputStream in = fs.open(lockFile); + BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + String lastLine = null; + for (String line = reader.readLine(); line != null; line = reader.readLine()) { + lastLine = line; } - } - if( lastEntry.eventTime <= olderThan ) - return lastEntry; + return LogEntry.deserialize(lastLine); } - return null; - } - - /** - * returns the last log entry - * @param fs - * @param lockFile - * @return - * @throws IOException - */ - public static LogEntry getLastEntry(FileSystem fs, Path lockFile) - throws IOException { - FSDataInputStream in = fs.open(lockFile); - BufferedReader reader = new BufferedReader(new InputStreamReader(in)); - String lastLine = null; - for(String line = reader.readLine(); line!=null; line = reader.readLine() ) { - lastLine=line; + + /** + * Takes ownership of the lock file if possible. + * @param lockFile + * @param lastEntry last entry in the lock file. this param is an optimization. + * we dont scan the lock file again to find its last entry here since + * its already been done once by the logic used to check if the lock + * file is stale. so this value comes from that earlier scan. + * @param spoutId spout id + * @throws IOException if unable to acquire + * @return null if lock File is not recoverable + */ + public static FileLock takeOwnership(FileSystem fs, Path lockFile, LogEntry lastEntry, String spoutId) + throws IOException { + try { + if (fs instanceof DistributedFileSystem) { + if (!((DistributedFileSystem) fs).recoverLease(lockFile)) { + LOG.warn( + "Unable to recover lease on lock file {} right now. Cannot transfer ownership. Will need to try later. Spout = {}", + lockFile, spoutId); + return null; + } + } + return new FileLock(fs, lockFile, spoutId, lastEntry); + } catch (IOException e) { + if (e instanceof RemoteException && + ((RemoteException) e).unwrapRemoteException() instanceof AlreadyBeingCreatedException) { + LOG.warn( + "Lock file " + lockFile + "is currently open. Cannot transfer ownership now. Will need to try later. Spout= " + spoutId, + e); + return null; + } else { // unexpected error + LOG.warn("Cannot transfer ownership now for lock file " + lockFile + ". Will need to try later. Spout =" + spoutId, e); + throw e; + } + } } - return LogEntry.deserialize(lastLine); - } - - /** - * Takes ownership of the lock file if possible. - * @param lockFile - * @param lastEntry last entry in the lock file. this param is an optimization. - * we dont scan the lock file again to find its last entry here since - * its already been done once by the logic used to check if the lock - * file is stale. so this value comes from that earlier scan. - * @param spoutId spout id - * @throws IOException if unable to acquire - * @return null if lock File is not recoverable - */ - public static FileLock takeOwnership(FileSystem fs, Path lockFile, LogEntry lastEntry, String spoutId) - throws IOException { - try { - if(fs instanceof DistributedFileSystem ) { - if( !((DistributedFileSystem) fs).recoverLease(lockFile) ) { - LOG.warn("Unable to recover lease on lock file {} right now. Cannot transfer ownership. Will need to try later. Spout = {}", lockFile, spoutId); - return null; + + /** + * Finds a oldest expired lock file (using modification timestamp), then takes + * ownership of the lock file + * Impt: Assumes access to lockFilesDir has been externally synchronized such that + * only one thread accessing the same thread + * @param fs + * @param lockFilesDir + * @param locktimeoutSec + * @return + */ + public static FileLock acquireOldestExpiredLock(FileSystem fs, Path lockFilesDir, int locktimeoutSec, String spoutId) + throws IOException { + // list files + long now = System.currentTimeMillis(); + long olderThan = now - (locktimeoutSec * 1000); + Collection<Path> listing = HdfsUtils.listFilesByModificationTime(fs, lockFilesDir, olderThan); + + // locate expired lock files (if any). Try to take ownership (oldest lock first) + for (Path file : listing) { + if (file.getName().equalsIgnoreCase(DirLock.DIR_LOCK_FILE)) { + continue; + } + LogEntry lastEntry = getLastEntryIfStale(fs, file, olderThan); + if (lastEntry != null) { + FileLock lock = FileLock.takeOwnership(fs, file, lastEntry, spoutId); + if (lock != null) { + return lock; + } + } + } + if (listing.isEmpty()) { + LOG.debug("No abandoned lock files found by Spout {}", spoutId); } - } - return new FileLock(fs, lockFile, spoutId, lastEntry); - } catch (IOException e) { - if (e instanceof RemoteException && - ((RemoteException) e).unwrapRemoteException() instanceof AlreadyBeingCreatedException) { - LOG.warn("Lock file " + lockFile + "is currently open. Cannot transfer ownership now. Will need to try later. Spout= " + spoutId, e); return null; - } else { // unexpected error - LOG.warn("Cannot transfer ownership now for lock file " + lockFile + ". Will need to try later. Spout =" + spoutId, e); - throw e; - } } - } - - /** - * Finds a oldest expired lock file (using modification timestamp), then takes - * ownership of the lock file - * Impt: Assumes access to lockFilesDir has been externally synchronized such that - * only one thread accessing the same thread - * @param fs - * @param lockFilesDir - * @param locktimeoutSec - * @return - */ - public static FileLock acquireOldestExpiredLock(FileSystem fs, Path lockFilesDir, int locktimeoutSec, String spoutId) - throws IOException { - // list files - long now = System.currentTimeMillis(); - long olderThan = now - (locktimeoutSec*1000); - Collection<Path> listing = HdfsUtils.listFilesByModificationTime(fs, lockFilesDir, olderThan); - - // locate expired lock files (if any). Try to take ownership (oldest lock first) - for (Path file : listing) { - if(file.getName().equalsIgnoreCase( DirLock.DIR_LOCK_FILE) ) { - continue; - } - LogEntry lastEntry = getLastEntryIfStale(fs, file, olderThan); - if(lastEntry!=null) { - FileLock lock = FileLock.takeOwnership(fs, file, lastEntry, spoutId); - if(lock!=null) { - return lock; + + /** + * Finds oldest expired lock file (using modification timestamp), then takes + * ownership of the lock file + * Impt: Assumes access to lockFilesDir has been externally synchronized such that + * only one thread accessing the same thread + * @param fs + * @param lockFilesDir + * @param locktimeoutSec + * @return a Pair<lock file path, last entry in lock file> .. if expired lock file found + * @throws IOException + */ + public static HdfsUtils.Pair<Path, LogEntry> locateOldestExpiredLock(FileSystem fs, Path lockFilesDir, int locktimeoutSec) + throws IOException { + // list files + long now = System.currentTimeMillis(); + long olderThan = now - (locktimeoutSec * 1000); + Collection<Path> listing = HdfsUtils.listFilesByModificationTime(fs, lockFilesDir, olderThan); + + // locate oldest expired lock file (if any) and take ownership + for (Path file : listing) { + if (file.getName().equalsIgnoreCase(DirLock.DIR_LOCK_FILE)) { + continue; + } + LogEntry lastEntry = getLastEntryIfStale(fs, file, olderThan); + if (lastEntry != null) { + return new HdfsUtils.Pair<>(file, lastEntry); + } } - } + LOG.debug("No abandoned files found"); + return null; } - if(listing.isEmpty()) { - LOG.debug("No abandoned lock files found by Spout {}", spoutId); + + public void heartbeat(String fileOffset) throws IOException { + logProgress(fileOffset, true); } - return null; - } - - - /** - * Finds oldest expired lock file (using modification timestamp), then takes - * ownership of the lock file - * Impt: Assumes access to lockFilesDir has been externally synchronized such that - * only one thread accessing the same thread - * @param fs - * @param lockFilesDir - * @param locktimeoutSec - * @return a Pair<lock file path, last entry in lock file> .. if expired lock file found - * @throws IOException - */ - public static HdfsUtils.Pair<Path,LogEntry> locateOldestExpiredLock(FileSystem fs, Path lockFilesDir, int locktimeoutSec) - throws IOException { - // list files - long now = System.currentTimeMillis(); - long olderThan = now - (locktimeoutSec*1000); - Collection<Path> listing = HdfsUtils.listFilesByModificationTime(fs, lockFilesDir, olderThan); - - // locate oldest expired lock file (if any) and take ownership - for (Path file : listing) { - if(file.getName().equalsIgnoreCase( DirLock.DIR_LOCK_FILE) ) { - continue; - } - LogEntry lastEntry = getLastEntryIfStale(fs, file, olderThan); - if(lastEntry!=null) { - return new HdfsUtils.Pair<>(file, lastEntry); - } + + // new line is at beginning of each line (instead of end) for better recovery from + // partial writes of prior lines + private void logProgress(String fileOffset, boolean prefixNewLine) + throws IOException { + long now = System.currentTimeMillis(); + LogEntry entry = new LogEntry(now, componentID, fileOffset); + String line = entry.toString(); + if (prefixNewLine) { + lockFileStream.writeBytes(System.lineSeparator() + line); + } else { + lockFileStream.writeBytes(line); + } + lockFileStream.hflush(); + + lastEntry = entry; // update this only after writing to hdfs + } + + /** Release lock by deleting file + * @throws IOException if lock file could not be deleted + */ + public void release() throws IOException { + lockFileStream.close(); + if (!fs.delete(lockFile, false)) { + LOG.warn("Unable to delete lock file, Spout = {}", componentID); + throw new IOException("Unable to delete lock file"); + } + LOG.debug("Released lock file {}. Spout {}", lockFile, componentID); } - LOG.debug("No abandoned files found"); - return null; - } - - public LogEntry getLastLogEntry() { - return lastEntry; - } - - public Path getLockFile() { - return lockFile; - } - - public static class LogEntry { - private static final int NUM_FIELDS = 3; - public final long eventTime; - public final String componentID; - public final String fileOffset; - - public LogEntry(long eventtime, String componentID, String fileOffset) { - this.eventTime = eventtime; - this.componentID = componentID; - this.fileOffset = fileOffset; + + // For testing only.. invoked via reflection + private void forceCloseLockFile() throws IOException { + lockFileStream.close(); } - public String toString() { - return eventTime + "," + componentID + "," + fileOffset; + public LogEntry getLastLogEntry() { + return lastEntry; } - public static LogEntry deserialize(String line) { - String[] fields = line.split(",", NUM_FIELDS); - return new LogEntry(Long.parseLong(fields[0]), fields[1], fields[2]); + + public Path getLockFile() { + return lockFile; } - @Override - public boolean equals(Object o) { - if (this == o) { return true; } - if (!(o instanceof LogEntry)) { return false; } + public static class LogEntry { + private static final int NUM_FIELDS = 3; + public final long eventTime; + public final String componentID; + public final String fileOffset; - LogEntry logEntry = (LogEntry) o; + public LogEntry(long eventtime, String componentID, String fileOffset) { + this.eventTime = eventtime; + this.componentID = componentID; + this.fileOffset = fileOffset; + } - if (eventTime != logEntry.eventTime) { return false; } - if (!componentID.equals(logEntry.componentID)) { return false; } - return fileOffset.equals(logEntry.fileOffset); + public static LogEntry deserialize(String line) { + String[] fields = line.split(",", NUM_FIELDS); + return new LogEntry(Long.parseLong(fields[0]), fields[1], fields[2]); + } - } + public String toString() { + return eventTime + "," + componentID + "," + fileOffset; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof LogEntry)) { + return false; + } + + LogEntry logEntry = (LogEntry) o; + + if (eventTime != logEntry.eventTime) { + return false; + } + if (!componentID.equals(logEntry.componentID)) { + return false; + } + return fileOffset.equals(logEntry.fileOffset); - @Override - public int hashCode() { - int result = (int) (eventTime ^ (eventTime >>> 32)); - result = 31 * result + componentID.hashCode(); - result = 31 * result + fileOffset.hashCode(); - return result; + } + + @Override + public int hashCode() { + int result = (int) (eventTime ^ (eventTime >>> 32)); + result = 31 * result + componentID.hashCode(); + result = 31 * result + fileOffset.hashCode(); + return result; + } } - } } http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java index 78296b9..bf58815 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.hdfs.spout; @@ -30,7 +24,8 @@ package org.apache.storm.hdfs.spout; */ interface FileOffset extends Comparable<FileOffset>, Cloneable { - /** tests if rhs == currOffset+1 */ - boolean isNextOffset(FileOffset rhs); - FileOffset clone(); + /** tests if rhs == currOffset+1 */ + boolean isNextOffset(FileOffset rhs); + + FileOffset clone(); } http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java index 54a90d4..49d998a 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java @@ -1,44 +1,37 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.hdfs.spout; -import org.apache.hadoop.fs.Path; - import java.io.IOException; import java.util.List; +import org.apache.hadoop.fs.Path; interface FileReader { - Path getFilePath(); + Path getFilePath(); - /** - * A simple numeric value may not be sufficient for certain formats consequently - * this is a String. - */ - FileOffset getFileOffset(); + /** + * A simple numeric value may not be sufficient for certain formats consequently + * this is a String. + */ + FileOffset getFileOffset(); - /** - * Get the next tuple from the file - * - * @return null if no more data - * @throws IOException - */ - List<Object> next() throws IOException, ParseException; + /** + * Get the next tuple from the file + * + * @return null if no more data + * @throws IOException + */ + List<Object> next() throws IOException, ParseException; - void close(); + void close(); }
