http://git-wip-us.apache.org/repos/asf/oodt/blob/098cc4fa/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/DataFileToPropFileLinker.java ---------------------------------------------------------------------- diff --git a/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/DataFileToPropFileLinker.java b/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/DataFileToPropFileLinker.java deleted file mode 100644 index 5bd4be3..0000000 --- a/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/DataFileToPropFileLinker.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.oodt.cas.pushpull.retrievalsystem; - -//JDK imports -import java.io.File; -import java.util.concurrent.ConcurrentHashMap; -import java.util.LinkedList; -import java.util.Map.Entry; - -//OODT imports -import org.apache.oodt.cas.protocol.ProtocolFile; - -/** - * - * @author bfoster - * @version $Revision$ - * - * <p> - * Describe your class here - * </p>. - */ -public class DataFileToPropFileLinker implements DownloadListener { - - private ConcurrentHashMap<String, File> protocolFilePathAndPropFileMap; - - private ConcurrentHashMap<File, String> propFileToErrorsMap; - - private LinkedList<ProtocolFile> downloadingDataFiles; - - private LinkedList<ProtocolFile> failedDataFiles; - - private LinkedList<ProtocolFile> successDataFiles; - - public DataFileToPropFileLinker() { - this.protocolFilePathAndPropFileMap = new ConcurrentHashMap<String, File>(); - this.propFileToErrorsMap = new ConcurrentHashMap<File, String>(); - downloadingDataFiles = new LinkedList<ProtocolFile>(); - failedDataFiles = new LinkedList<ProtocolFile>(); - successDataFiles = new LinkedList<ProtocolFile>(); - } - - public synchronized void addPropFileToDataFileLink(File propFile, - ProtocolFile pFile) { - this.addPropFileToDataFileLink(propFile, pFile.getPath()); - } - - public synchronized void addPropFileToDataFileLink(File propFile, - String remoteDataFilePath) { - this.protocolFilePathAndPropFileMap.put(remoteDataFilePath, propFile); - } - - public synchronized void markAsFailed(File propFile, String errorMsg) { - String errors = this.propFileToErrorsMap.get(propFile); - if (errors == null) { - this.propFileToErrorsMap.put(propFile, errorMsg); - } else { - this.propFileToErrorsMap.put(propFile, errors + "," + errorMsg); - } - } - - public synchronized void markAsFailed(ProtocolFile pFile, String errorMsg) { - this.markAsFailed(pFile.getPath(), errorMsg); - } - - public synchronized void markAsFailed(String pFilePath, String errorMsg) { - File propFile = this.protocolFilePathAndPropFileMap.get(pFilePath); - if (propFile != null) { - String errors = this.propFileToErrorsMap.get(propFile); - if (errors == null) { - this.propFileToErrorsMap.put(propFile, errorMsg); - } else { - this.propFileToErrorsMap.put(propFile, errors + "," + errorMsg); - } - } - } - - public synchronized String getErrorsAndEraseLinks(File propFile) { - this.eraseLinks(propFile); - return this.getErrors(propFile); - } - - public synchronized void eraseLinks(File propFile) { - LinkedList<String> keysToRemove = new LinkedList<String>(); - for (Entry<String, File> entry : this.protocolFilePathAndPropFileMap - .entrySet()) { - if (entry.getValue().equals(propFile)) { - keysToRemove.add(entry.getKey()); - } - } - for (String key : keysToRemove) { - this.protocolFilePathAndPropFileMap.remove(key); - } - } - - public synchronized String getErrors(File propFile) { - return this.propFileToErrorsMap.remove(propFile); - } - - public synchronized String getStatusOf(File propFile) { - return "Status for " + propFile.getAbsolutePath() + ":" + " Errors: \n" - + " " + this.propFileToErrorsMap.get(propFile) + "\n" - + " Data files specified which are currently downloading:\n" - + " " + this.getDownloadingFilesLinkedToPropFile(propFile) - + "\n" - + " Data files specified which successfully downloaded:\n" - + " " - + this.getSuccessfullyDownloadedFilesLinkedToPropFile(propFile) - + "\n" + " Data files specified which failed to download:\n" - + " " - + this.getFailedToDownloadFilesLinkedToPropFile(propFile) - + "\n"; - } - - public synchronized LinkedList<ProtocolFile> getDownloadingFilesLinkedToPropFile( - File propFile) { - return this.getFilesLinkedToPropFileInList(propFile, - this.downloadingDataFiles); - } - - public synchronized LinkedList<ProtocolFile> getSuccessfullyDownloadedFilesLinkedToPropFile( - File propFile) { - return this.getFilesLinkedToPropFileInList(propFile, - this.successDataFiles); - } - - public synchronized LinkedList<ProtocolFile> getFailedToDownloadFilesLinkedToPropFile( - File propFile) { - return this.getFilesLinkedToPropFileInList(propFile, - this.failedDataFiles); - } - - private LinkedList<ProtocolFile> getFilesLinkedToPropFileInList( - File propFile, LinkedList<ProtocolFile> list) { - LinkedList<ProtocolFile> returnList = new LinkedList<ProtocolFile>(); - for (ProtocolFile pFile : list) { - if (this.protocolFilePathAndPropFileMap.get(pFile.getPath()) != null) { - returnList.add(pFile); - } - } - return returnList; - } - - public synchronized void downloadFailed(ProtocolFile file, String errorMsg) { - this.markAsFailed(file, "Failed to download '" + file - + "' -- Logged Error msg: " + errorMsg); - this.failedDataFiles.add(file); - this.downloadingDataFiles.remove(file); - } - - public synchronized void downloadFinished(ProtocolFile file) { - this.successDataFiles.add(file); - this.downloadingDataFiles.remove(file); - } - - public synchronized void downloadStarted(ProtocolFile file) { - this.downloadingDataFiles.add(file); - } - - public synchronized void clear() { - this.propFileToErrorsMap.clear(); - this.protocolFilePathAndPropFileMap.clear(); - this.downloadingDataFiles.clear(); - this.failedDataFiles.clear(); - this.successDataFiles.clear(); - } -}
http://git-wip-us.apache.org/repos/asf/oodt/blob/098cc4fa/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/DownloadListener.java ---------------------------------------------------------------------- diff --git a/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/DownloadListener.java b/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/DownloadListener.java deleted file mode 100644 index 3db621c..0000000 --- a/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/DownloadListener.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.oodt.cas.pushpull.retrievalsystem; - -//OODT imports -import org.apache.oodt.cas.protocol.ProtocolFile; - -/** - * - * @author bfoster - * @version $Revision$ - * - * <p> - * Describe your class here - * </p>. - */ -public interface DownloadListener { - - void downloadStarted(ProtocolFile pFile); - - void downloadFinished(ProtocolFile pFile); - - void downloadFailed(ProtocolFile pFile, String errorMsg); - -} http://git-wip-us.apache.org/repos/asf/oodt/blob/098cc4fa/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/DownloadThreadEvaluator.java ---------------------------------------------------------------------- diff --git a/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/DownloadThreadEvaluator.java b/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/DownloadThreadEvaluator.java deleted file mode 100644 index a43dcd1..0000000 --- a/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/DownloadThreadEvaluator.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.oodt.cas.pushpull.retrievalsystem; - -//OODT imports - -import org.apache.oodt.cas.pushpull.exceptions.ThreadEvaluatorException; - -import java.io.File; -import java.util.concurrent.ConcurrentHashMap; -import java.util.LinkedList; -import java.util.Map.Entry; -import java.util.Set; -import java.util.logging.Level; -import java.util.logging.Logger; - -//JDK imports - -/** - * - * @author bfoster - * @version $Revision$ - * - * <p> - * Describe your class here - * </p>. - */ -public class DownloadThreadEvaluator { - private static Logger LOG = Logger.getLogger(DownloadThreadEvaluator.class.getName()); - private ConcurrentHashMap<File, DownloadingFileInfo> fileAndDownloadingFileInfo; - - private final int MAX_THREADS; - - private int currentThreadCount; - - private double[] downloadSpeedsForEachThread; - - public DownloadThreadEvaluator(int maxThreads) { - this.MAX_THREADS = maxThreads; - downloadSpeedsForEachThread = new double[maxThreads + 1]; - fileAndDownloadingFileInfo = new ConcurrentHashMap<File, DownloadingFileInfo>(); - currentThreadCount = 0; - } - - public synchronized void startTrackingDownloadRuntimeForFile(File file) - throws ThreadEvaluatorException { - long curTime = System.currentTimeMillis(); - if (++this.currentThreadCount > this.MAX_THREADS) { - throw new ThreadEvaluatorException( - "Number of threads exceeds max allows threads"); - } - updateThreadCounts(curTime); - fileAndDownloadingFileInfo.put(file, new DownloadingFileInfo(file, - curTime, this.currentThreadCount)); - } - - private void updateThreadCounts(long curTime) { - Set<Entry<File, DownloadingFileInfo>> entrySet = fileAndDownloadingFileInfo - .entrySet(); - for (Entry<File, DownloadingFileInfo> entry : entrySet) { - entry.getValue() - .updateThreadCount(curTime, this.currentThreadCount); - } - } - - public synchronized void cancelRuntimeTracking(File file) { - fileAndDownloadingFileInfo.remove(file); - currentThreadCount--; - updateThreadCounts(System.currentTimeMillis()); - } - - public synchronized void fileDownloadComplete(File file) - throws ThreadEvaluatorException { - try { - long finishTime = System.currentTimeMillis(); - DownloadingFileInfo dfi = fileAndDownloadingFileInfo.remove(file); - updateThreadCounts(finishTime); - LinkedList<TimeAndThreadCount> tatcList = dfi - .getTimeAndThreadInfo(); - long runtime = finishTime - dfi.getStartTimeInMillis(); - double total = 0; - long nextTime; - for (int i = 0; i < tatcList.size(); i++) { - TimeAndThreadCount tatc = tatcList.get(i); - if (i + 1 >= tatcList.size()) { - nextTime = finishTime; - } else { - nextTime = tatcList.get(i + 1).getStartTimeInMillis(); - } - long threadCountTime = nextTime - tatc.getStartTimeInMillis(); - total += ((double) (tatc.getThreadCount() * threadCountTime)) - / (double) runtime; - } - int avgThreadCountForFile = (int) Math.rint(total); - System.out.println("Recorded avg: " + avgThreadCountForFile); - - double downloadSpeed = (file.length() * avgThreadCountForFile) - / calculateRuntime(dfi.getStartTimeInMillis()); - double currentAvgSpeed = this.downloadSpeedsForEachThread[avgThreadCountForFile]; - if (currentAvgSpeed == 0) { - this.downloadSpeedsForEachThread[avgThreadCountForFile] = downloadSpeed; - } else { - this.downloadSpeedsForEachThread[avgThreadCountForFile] = (currentAvgSpeed + downloadSpeed) / 2; - } - } catch (Exception e) { - LOG.log(Level.SEVERE, e.getMessage()); - throw new ThreadEvaluatorException("Failed to register file " - + file + " as downloaded : " + e.getMessage()); - } finally { - currentThreadCount--; - } - } - - long calculateRuntime(final long startTime) { - return System.currentTimeMillis() - startTime; - } - - public synchronized int getRecommendedThreadCount() { - - int curRecThreadCount = 1; - double curMaxSpeed = this.downloadSpeedsForEachThread[curRecThreadCount]; - for (int i = 1; i < this.downloadSpeedsForEachThread.length; i++) { - double curSpeed = this.downloadSpeedsForEachThread[i]; - if (curSpeed > curMaxSpeed) { - curMaxSpeed = curSpeed; - curRecThreadCount = i; - } - } - - if (curRecThreadCount != this.MAX_THREADS - && this.downloadSpeedsForEachThread[curRecThreadCount + 1] == 0) { - curRecThreadCount++; - } else if (this.downloadSpeedsForEachThread[curRecThreadCount - 1] == 0) { - curRecThreadCount--; - } - - System.out.print("[ "); - for (double time : downloadSpeedsForEachThread) { - System.out.print(time + " "); - } - System.out.println("]"); - - System.out.println("Recommended Threads: " + curRecThreadCount); - - return curRecThreadCount; - } - -} http://git-wip-us.apache.org/repos/asf/oodt/blob/098cc4fa/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/DownloadingFileInfo.java ---------------------------------------------------------------------- diff --git a/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/DownloadingFileInfo.java b/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/DownloadingFileInfo.java deleted file mode 100644 index 555dbef..0000000 --- a/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/DownloadingFileInfo.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.oodt.cas.pushpull.retrievalsystem; - -//JDK imports -import java.io.File; -import java.util.LinkedList; - -/** - * @author bfoster - * @version $Revision$ - * - * <p> - * Describe your class here - * </p>. - */ -public class DownloadingFileInfo { - - private File downloadingFile; - - private long startTimeInMillis; - - private LinkedList<TimeAndThreadCount> timeAndThreadCountList; - - public DownloadingFileInfo(File downloadingFile, long startTimeInMillis, - int currentThreadCount) { - timeAndThreadCountList = new LinkedList<TimeAndThreadCount>(); - this.downloadingFile = downloadingFile; - this.startTimeInMillis = startTimeInMillis; - this.timeAndThreadCountList.add(new TimeAndThreadCount( - startTimeInMillis, currentThreadCount)); - } - - public void updateThreadCount(long timeInMillis, int threadCount) { - this.timeAndThreadCountList.add(new TimeAndThreadCount(timeInMillis, - threadCount)); - } - - public File getDownloadingFile() { - return this.downloadingFile; - } - - public long getStartTimeInMillis() { - return this.startTimeInMillis; - } - - public LinkedList<TimeAndThreadCount> getTimeAndThreadInfo() { - return this.timeAndThreadCountList; - } -} http://git-wip-us.apache.org/repos/asf/oodt/blob/098cc4fa/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/FileRetrievalSystem.java ---------------------------------------------------------------------- diff --git a/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/FileRetrievalSystem.java b/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/FileRetrievalSystem.java deleted file mode 100644 index 6088b49..0000000 --- a/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/FileRetrievalSystem.java +++ /dev/null @@ -1,966 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.oodt.cas.pushpull.retrievalsystem; - -import org.apache.oodt.cas.pushpull.config.Config; -import org.apache.oodt.cas.pushpull.config.SiteInfo; -import org.apache.oodt.cas.pushpull.exceptions.AlreadyInDatabaseException; -import org.apache.oodt.cas.pushpull.exceptions.CrawlerException; -import org.apache.oodt.cas.pushpull.exceptions.RemoteConnectionException; -import org.apache.oodt.cas.pushpull.exceptions.ThreadEvaluatorException; -import org.apache.oodt.cas.pushpull.exceptions.ToManyFailedDownloadsException; -import org.apache.oodt.cas.pushpull.exceptions.UndefinedTypeException; -import org.apache.oodt.cas.pushpull.filerestrictions.renamingconventions.RenamingConvention; -import org.apache.oodt.cas.protocol.exceptions.ProtocolException; -import org.apache.oodt.cas.protocol.util.ProtocolFileFilter; -import org.apache.oodt.cas.protocol.Protocol; -import org.apache.oodt.cas.protocol.ProtocolFile; -import org.apache.oodt.cas.pushpull.protocol.ProtocolHandler; -import org.apache.oodt.cas.pushpull.protocol.RemoteSite; -import org.apache.oodt.cas.pushpull.protocol.RemoteSiteFile; -import org.apache.oodt.cas.filemgr.structs.exceptions.CatalogException; -import org.apache.oodt.cas.metadata.Metadata; -import org.apache.oodt.cas.metadata.util.MimeTypeUtils; - - -//JDK imports -import java.io.File; -import java.io.FileFilter; -import java.io.IOException; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Vector; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; - -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; - -/** - * <pre> - * Will crawl external directory structures and will download the files within these structures. - * - * This class's settings are set using a java .properties file which can be read in and parsed by Config.java. - * This .properties file should have the following properties set: - * - * {@literal #list of sites to crawl - * protocol.external.sources=<path-to-xml-file> - * - * #protocol types - * protocolfactory.types=<list-of-protocols-separated-by-commas> (e.g. ftp,http,https,sftp) - * - * #Protocol factories per types (must have one for each protocol mention in protocolfactory.types -- the property must be name - * # as such: protocolfactory.<name-of-protocol-type> - * protocolfactory.ftp=<path-to-java-protocolfactory-class> (e.g. org.apache.oodt.cas.protocol.ftp.FtpClientFactory) - * protocolfactory.http=<path-to-java-protocolfactory-class> - * protocolfactory.https=<path-to-java-protocolfactory-class> - * protocolfactory.sftp=<path-to-java-protocolfactory-class> - * - * #configuration to make java.net.URL accept unsupported protocols -- must exist just as shown - * java.protocol.handler.pkgs=org.apache.oodt.cas.url.handlers - * } - * - * In order to specify which external sites to crawl you must create a XML file which contains the - * the site and necessary information needed to crawl the site, such as username and password. - * protocol.external.sources must contain the path to this file so the crawl knows where to find it. - * You can also train this class on how to crawl each given site. This is also specified in an XML - * file, whose path must be given in the first mentioned XML file which contians the username and password. - * - * Then schema for the external sites XML file is as such: - * - * {@literal <sources> - * <source url="url-of-server"> - * <username>username</username> - * <password>password</password> - * <dirstruct>path-to-xml-file</dirstruct> - * <crawl>yes-or-no</crawl> - * </source> - * ... - * ... - * ... - * </sources\>} - * - * You may specify as many sources as you would like by specifying multiple {@literal <source>} tags. - * In the {@literal <source>} tag, the parameter 'url' must be specified. This is the url of the server - * you want the crawler to connect to. It should be of the following format: - * {@literal <protocol>://<host>} (e.g. sftp://remote.computer.gov) - * If no username and password exist, then these elements can be omitted (they are optional). - * For {@literal <crawl>} place yes or no here. This is for convenience of being able to keep record of the - * sites and their information in this XML file even if you decide that you no longer need to crawl it - * anymore (just put {@literal <crawl>no</crawl>} and the crawl will not crawl that site). - * {@literal <dirStruct>} contains a path to another XML file which is documented in DirStruct.java javadoc. This - * element is optional. If no {@literal <dirStruct>} is given, then every directory will be crawled on the site - * and every encountered file will be downloaded. - * </pre> - * - * @author bfoster (Brian Foster) - */ -public class FileRetrievalSystem { - - /* our log stream */ - private static final Logger LOG = Logger - .getLogger(FileRetrievalSystem.class.getName()); - - private final static int MAX_RETRIES = 3; - public static final int INT = 180; - public static final int TIMEOUT = 5000; - public static final int TIMEOUT1 = 5000; - public static final int TIMEOUT2 = 600; - - private LinkedList<ProtocolFile> failedDownloadList; - - private HashSet<ProtocolFile> currentlyDownloading; - - private int max_allowed_failed_downloads; - - /** - * The max number of threads able to run at the same time - */ - private int max_sessions; - - private final int absMaxAllowedSessions = 50; - - /** - * This is just for clarity purposes. . .I only create the amount of threads - * that I will allow to be used at any given moment - */ - private final static int EXTRA_LAZY_SESSIONS_TIMEOUT = 10; - - /** - * A list of created protocol sessions (devoted to grabbing files from the - * crawling directory structure) that are not presently in use. - */ - private Vector<Protocol> avaliableSessions; - - /** - * The number of sessions that have been created (should always be less than - * or equal to MAX_SESSIONS). - */ - private int numberOfSessions; - - /** - * The thread pool that is in charge of the sessions. - */ - private ThreadPoolExecutor threadController; - - /** - * Manages the Protocols and always ensures that the Crawler is using the - * appropriate protocol for any given server. - */ - private ProtocolHandler protocolHandler; - - /** - * max_sessions tracker - */ - private DownloadThreadEvaluator dtEval; - - private DownloadListener dListener; - - private Config config; - - private SiteInfo siteInfo; - - private HashSet<File> stagingAreas; - - private MimeTypeUtils mimeTypeDetection; - - /** - * Creates a Crawler based on the URL, DirStruct, and Config objects passed - * in. If no DirStruct is needed then set it to null. - * - * @param config - * The Configuration file that is passed to this objects - * ProtocolHandler. - * @throws InstantiationException - * @throws DatabaseException - */ - public FileRetrievalSystem(Config config, SiteInfo siteInfo) - throws InstantiationException { - try { - protocolHandler = new ProtocolHandler(config.getProtocolInfo()); - this.config = config; - this.siteInfo = siteInfo; - mimeTypeDetection = new MimeTypeUtils(config - .getProductTypeDetectionFile()); - } catch (Exception e) { - LOG.log(Level.SEVERE, e.getMessage()); - throw new InstantiationException( - "Failed to create FileRetrievalSystem : " + e.getMessage()); - } - } - - public void registerDownloadListener(DownloadListener dListener) { - this.dListener = dListener; - } - - public void initialize() throws IOException { - try { - resetVariables(); - } catch (Exception e) { - throw new IOException("Failed to initialize FileRetrievalSystem : " - + e.getMessage()); - } - } - - /** - * Initializes variables that must be reset when more than one crawl is done - * - * @throws ThreadEvaluatorException - */ - void resetVariables() { - numberOfSessions = 0; - stagingAreas = new HashSet<File>(); - avaliableSessions = new Vector<Protocol>(); - currentlyDownloading = new HashSet<ProtocolFile>(); - failedDownloadList = new LinkedList<ProtocolFile>(); - max_allowed_failed_downloads = config.getMaxFailedDownloads(); - max_sessions = config.getRecommendedThreadCount(); - threadController = new ThreadPoolExecutor(this.max_sessions, - this.max_sessions, EXTRA_LAZY_SESSIONS_TIMEOUT, - TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); - if (config.useTracker()) { - dtEval = new DownloadThreadEvaluator(this.absMaxAllowedSessions); - } - } - - /** - * reset error flag - */ - public void clearErrorFlag() { - max_allowed_failed_downloads += config.getMaxFailedDownloads(); - } - - public boolean isAlreadyInDatabase(RemoteFile rf) throws CatalogException { - return config.getIngester() != null && config.getIngester().hasProduct( - config.getFmUrl(), rf.getMetadata(RemoteFile.PRODUCT_NAME)); - } - - public List<RemoteSiteFile> getNextPage(final RemoteSiteFile dir, - final ProtocolFileFilter filter) throws RemoteConnectionException { - for (int i = 0; i < 3; i++) { - try { - return protocolHandler.nextPage(dir.getSite(), protocolHandler - .getAppropriateProtocol(dir, true, true), - new ProtocolFileFilter() { - @Override - public boolean accept(ProtocolFile file) { - return filter.accept(file) - && !FileRetrievalSystem.this - .isDownloading(file); - } - }); - } catch (Exception e) { - LOG.log(Level.WARNING, "Retrying to get next page for " + dir - + " because operation failed : " + e.getMessage(), e); - } - } - throw new RemoteConnectionException("Failed to get next page for " - + dir); - } - - public void changeToRoot(RemoteSite remoteSite) throws - org.apache.oodt.cas.protocol.exceptions.ProtocolException { - if (validate(remoteSite)) { - protocolHandler.cdToROOT(protocolHandler - .getAppropriateProtocolBySite(remoteSite, true)); - } else { - throw new ProtocolException("Not a valid remote site " + remoteSite); - } - } - - public void changeToHOME(RemoteSite remoteSite) throws ProtocolException { - if (validate(remoteSite)) { - protocolHandler.cdToHOME(protocolHandler - .getAppropriateProtocolBySite(remoteSite, true)); - } else { - throw new ProtocolException("Not a valid remote site " + remoteSite); - } - } - - public void changeToDir(String dir, RemoteSite remoteSite) - throws ProtocolException { - if (validate(remoteSite)) { - this - .changeToDir(protocolHandler.getProtocolFileFor(remoteSite, - protocolHandler.getAppropriateProtocolBySite( - remoteSite, true), dir, true)); - } else { - throw new ProtocolException("Not a valid remote site " + remoteSite); - } - } - - public void changeToDir(RemoteSiteFile pFile) throws ProtocolException { - RemoteSite remoteSite = pFile.getSite(); - if (validate(remoteSite)) { - protocolHandler.cd(protocolHandler.getAppropriateProtocolBySite( - remoteSite, true), pFile); - } else { - throw new ProtocolException("Not a valid remote site " + remoteSite); - } - } - - public ProtocolFile getHomeDir(RemoteSite remoteSite) - throws ProtocolException { - if (validate(remoteSite)) { - return protocolHandler.getHomeDir(remoteSite, protocolHandler - .getAppropriateProtocolBySite(remoteSite, true)); - } else { - throw new ProtocolException("Not a valid remote site " + remoteSite); - } - } - - public ProtocolFile getProtocolFile(RemoteSite remoteSite, String file, - boolean isDir) throws ProtocolException { - if (validate(remoteSite)) { - return protocolHandler.getProtocolFileFor(remoteSite, protocolHandler - .getAppropriateProtocolBySite(remoteSite, true), file, - isDir); - } else { - throw new ProtocolException("Not a valid remote site " + remoteSite); - } - } - - public ProtocolFile getCurrentFile(RemoteSite remoteSite) - throws ProtocolException { - if (validate(remoteSite)) { - return protocolHandler.pwd(remoteSite, protocolHandler - .getAppropriateProtocolBySite(remoteSite, true)); - } else { - throw new ProtocolException("Not a valid remote site " + remoteSite); - } - } - - // returns true if download was added to queue. . .false otherwise - public boolean addToDownloadQueue(RemoteSite remoteSite, String file, - String renamingString, File downloadToDir, - String uniqueMetadataElement, boolean deleteAfterDownload, Metadata fileMetadata) - throws ToManyFailedDownloadsException, RemoteConnectionException, - ProtocolException, - AlreadyInDatabaseException, UndefinedTypeException, - CatalogException, IOException { - if (validate(remoteSite)) { - if (!file.startsWith("/")) { - file = "/" + file; - } - return addToDownloadQueue(protocolHandler.getProtocolFileFor(remoteSite, - protocolHandler.getAppropriateProtocolBySite(remoteSite, - true), file, false), renamingString, downloadToDir, - uniqueMetadataElement, deleteAfterDownload, fileMetadata); - } else { - throw new ProtocolException("Not a valid remote site " + remoteSite); - } - } - - public boolean validate(RemoteSite remoteSite) { - Preconditions.checkNotNull(remoteSite); - LinkedList<RemoteSite> remoteSites = this.siteInfo - .getPossibleRemoteSites(remoteSite.getAlias(), remoteSite - .getURL(), remoteSite.getUsername(), remoteSite - .getPassword()); - if (remoteSites.size() == 1) { - RemoteSite rs = remoteSites.get(0); - remoteSite.copy(rs); - return true; - } - return false; - } - - public void waitUntilAllCurrentDownloadsAreComplete() - throws ProtocolException { - synchronized (this) { - for (int i = 0; i < INT; i++) { - try { - if (this.avaliableSessions.size() == this.numberOfSessions) { - return; - } else { - this.wait(TIMEOUT); - } - } catch (Exception ignored) { - } - } - throw new ProtocolException( - "Downloads appear to be hanging . . . aborting wait . . . waited for 15 minutes"); - } - } - - public boolean addToDownloadQueue(RemoteSiteFile file, - String renamingString, - File downloadToDir, - String uniqueMetadataElement, - boolean deleteAfterDownload, - Metadata fileMetadata) throws ToManyFailedDownloadsException, - RemoteConnectionException, - AlreadyInDatabaseException, - UndefinedTypeException, - CatalogException, - IOException { - if (this.failedDownloadList.size() > max_allowed_failed_downloads) { - throw new ToManyFailedDownloadsException( - "Number of failed downloads exceeds " - + max_allowed_failed_downloads - + " . . . blocking all downloads from being added to queue . . . " - + "reset error flag in order to force allow downloads into queue"); - } - if (this.isDownloading(file)) { - LOG.log(Level.WARNING, "Skipping file '" + file - + "' because it is already on the download queue"); - return false; - } - - RemoteFile remoteFile = new RemoteFile(file); - remoteFile.addMetadata(fileMetadata); - remoteFile.addMetadata(RemoteFile.RENAMING_STRING, renamingString); - remoteFile.addMetadata(RemoteFile.DELETE_AFTER_DOWNLOAD, - deleteAfterDownload + ""); - - if (config.onlyDownloadDefinedTypes()) { - String mimeType = this.mimeTypeDetection.getMimeType(file.getName()); - if (mimeType != null - && !mimeType.equals("application/octet-stream")) { - remoteFile.addMetadata(RemoteFile.MIME_TYPE, mimeType); - remoteFile.addMetadata(RemoteFile.SUPER_TYPE, this.mimeTypeDetection - .getSuperTypeForMimeType(mimeType)); - String description = this.mimeTypeDetection - .getDescriptionForMimeType(mimeType); - if (!Strings.isNullOrEmpty(description)) { - if(description.contains("&")){ - for (String field : description.split("\\&\\&")) { - String[] keyval = field.split("\\="); - remoteFile.addMetadata(keyval[0].trim(), keyval[1].trim()); - } - } else{ - // it's the ProductType - remoteFile.addMetadata(RemoteFile.PRODUCT_TYPE, description); - } - if (remoteFile.getMetadata(RemoteFile.UNIQUE_ELEMENT) != null) { - uniqueMetadataElement = remoteFile.getMetadata(RemoteFile.UNIQUE_ELEMENT); - } - } - } else { - throw new UndefinedTypeException("File '" + file - + "' is not a defined type"); - } - } - - downloadToDir = new File(downloadToDir.isAbsolute() ? downloadToDir - .getAbsolutePath() : this.config.getBaseStagingArea() + "/" - + downloadToDir.getPath()); - if (!this.isStagingAreaInitialized(downloadToDir)) { - this.initializeStagingArea(downloadToDir); - } - - remoteFile.addMetadata(RemoteFile.DOWNLOAD_TO_DIR, downloadToDir.getAbsolutePath()); - - if (remoteFile.getMetadata(RemoteFile.PRODUCT_NAME_GENERATOR) != null) { - remoteFile.addMetadata(RemoteFile.PRODUCT_NAME, RenamingConvention.rename(remoteFile, remoteFile.getMetadata(RemoteFile.PRODUCT_NAME_GENERATOR))); - }else { - remoteFile.setUniqueMetadataElement(uniqueMetadataElement == null ? RemoteFile.FILENAME : uniqueMetadataElement); - } - - if (!isAlreadyInDatabase(remoteFile)) { - - // get download location - File newFile = getSaveToLoc(remoteFile); - - // add session to thread pool - if (!this.isInStagingArea(newFile)) { - for (int retries = 0;; retries++) { - try { - addSessionToThreadPool( - getNextAvaliableSession(remoteFile - .getProtocolFile()), remoteFile, - newFile); - return true; - } catch (Exception e) { - if (retries < MAX_RETRIES) { - LOG.log(Level.WARNING, "Failed to get session for " - + file + " . . . retrying in 5 secs"); - synchronized (this) { - try { - wait(TIMEOUT1); - } catch (Exception ignored) { - } - } - } else { - this.failedDownloadList.add(file); - throw new RemoteConnectionException( - "Failed to get session to download " + file - + " : " + e.getMessage(), e); - } - } - } - } else { - if (deleteAfterDownload) { - try { - protocolHandler - .delete(protocolHandler.getAppropriateProtocol( - file, true, true), file); - } catch (Exception e) { - LOG.log(Level.SEVERE, - "Failed to delete file from server : " - + e.getMessage()); - } - } - LOG.log(Level.WARNING, "Skipping file " + file - + " because it is already in staging area"); - return false; - } - } else { - throw new AlreadyInDatabaseException("File " + file - + " is already the database"); - } - } - - private boolean isStagingAreaInitialized(File stagingArea) { - return this.stagingAreas.contains(stagingArea); - } - - private boolean isInStagingArea(final File findFile) { - return (findFile.exists() || new File(findFile.getParentFile(), - "Downloading_" + findFile.getName()).exists()); - } - - private void initializeStagingArea(File stagingArea) throws IOException { - LOG.log(Level.INFO, "Preparing staging area " + stagingArea); - if (stagingArea.exists()) { - File[] failedDownloads = stagingArea.listFiles(new FileFilter() { - @Override - public boolean accept(File pathname) { - return pathname.getName().startsWith("Downloading_"); - } - }); - if(failedDownloads!=null) { - for (File file : failedDownloads) { - LOG.log(Level.INFO, "Removing failed download file " - + file.getAbsolutePath()); - file.delete(); - } - } - } else { - LOG.log(Level.INFO, "Staging area " + stagingArea.getAbsolutePath() - + " does not exist! -- trying to create it "); - if (!stagingArea.mkdirs()) { - throw new IOException("Failed to create staging area at " - + stagingArea.getAbsolutePath()); - } - } - this.stagingAreas.add(stagingArea); - } - - File getSaveToLoc(RemoteFile remoteFile) { - String renamingString = remoteFile - .getMetadata(RemoteFile.RENAMING_STRING); - if (renamingString == null || renamingString.equals("")) { - return new File(remoteFile.getMetadata(RemoteFile.DOWNLOAD_TO_DIR) - + "/" + remoteFile.getMetadata(RemoteFile.FILENAME)); - } else { - File newFile = new File(remoteFile - .getMetadata(RemoteFile.DOWNLOAD_TO_DIR) - + "/" - + RenamingConvention.rename(remoteFile, renamingString)); - if (!newFile.getParentFile().equals( - remoteFile.getMetadata(RemoteFile.DOWNLOAD_TO_DIR))) { - newFile.getParentFile().mkdirs(); - } - return newFile; - } - } - - Protocol getNextAvaliableSession(RemoteSiteFile file) throws CrawlerException { - // wait for available session, then load it - Protocol session; - while ((session = getSession(file)) == null) { - try { - waitMainThread(); - } catch (InterruptedException ignored) { - } - } - return session; - } - - /** - * Sleeps the crawling thread - * - * @throws InterruptedException - */ - synchronized void waitMainThread() throws InterruptedException { - wait(); - } - - /** - * Wakes up the crawling thread - */ - synchronized void wakeUpMainThread() { - notify(); - } - - /** - * Increments the number of downloading session - */ - synchronized void incrementSessions() { - numberOfSessions++; - } - - synchronized void decrementSessions() { - this.numberOfSessions--; - } - - /** - * Gets an available downloading session Protocol. Returns null if none are - * available - * - * @return The found downloading session Protocol - * @throws RemoteCommunicationException - * If downloading session Protocol has to be reconnected and - * there is an error communicating with the server - */ - synchronized Protocol getSession(RemoteSiteFile file) throws CrawlerException { - try { - Protocol session = null; - if (file.getSite().getMaxConnections() < 0 - || file.getSite().getMaxConnections() > this.getCurrentlyDownloadingFiles().size()) { - if (avaliableSessions.size() > 0) { - session = modifyAvailableSessionForPath(file); - } else if (numberOfSessions < max_sessions) { - session = createNewSessionForPath(file); - incrementSessions(); - } - } - return session; - } catch (Exception e) { - throw new CrawlerException("Failed to get new session : " - + e.getMessage(), e); - } - } - - Protocol createNewSessionForPath(RemoteSiteFile file) - throws RemoteConnectionException { - return protocolHandler.getAppropriateProtocol(file, /* reuse */false, /* navigate */ - true); - } - - Protocol modifyAvailableSessionForPath(RemoteSiteFile file) - throws RemoteConnectionException { - Protocol session = getAvailableSession(); - if (!file.getSite().getURL().getHost().equals( - file.getSite().getURL().getHost()) - || !protocolHandler.isProtocolConnected(session)) { - protocolHandler.disconnect(session); - session = protocolHandler.getAppropriateProtocol(file, /* reuse */ - false, /* navigate */true); - } else { - try { - if (file.isDir()) { - protocolHandler.cd(session, file); - } else { - protocolHandler.cd(session, - new RemoteSiteFile(file.getParent(), file.getSite())); - } - } catch (Exception e) { - LOG.log(Level.SEVERE, e.getMessage()); - try { - protocolHandler.disconnect(session); - } catch (Exception ignored) { - } - session = protocolHandler.getAppropriateProtocol(file, /* reuse */ - false, /* navigate */true); - } - } - return session; - } - - /** - * Puts a session in the available session list - * - * @param session - * The Protocol session to be added to the available list - */ - synchronized void addAvailableSession(Protocol session) { - avaliableSessions.add(session); - } - - /** - * Removes a session from the available list and returns it - * - * @return An available downloading Protocol session - */ - synchronized Protocol getAvailableSession() { - return avaliableSessions.remove(0); - } - - synchronized int getNumberOfUsedSessions() { - return numberOfSessions - avaliableSessions.size(); - } - - /** - * Registers a downloading session with the threadpoolexecutor to begin - * downloading the specified ProtocolFile to the local File location - * - * @param session - * The downloading Protocol session to be used to download the - * ProtocolFile - * @param newFile - * The location which the downloaded file will be stored - */ - void addSessionToThreadPool(final Protocol session, - final RemoteFile remoteFile, final File newFile) { - this.addToDownloadingList(remoteFile.getProtocolFile()); - threadController.execute(new Runnable() { - @Override - public void run() { - boolean successful = false; - int retries = 0; - Protocol curSession = session; - - if (FileRetrievalSystem.this.dListener != null) { - FileRetrievalSystem.this.dListener - .downloadStarted(remoteFile.getProtocolFile()); - } - - // try until successful or all retries have been used - do { - try { - // if thread tracker is to be used - if (config.useTracker()) { - dtEval.startTrackingDownloadRuntimeForFile(newFile); - protocolHandler.download(curSession, remoteFile - .getProtocolFile(), newFile, remoteFile - .getMetadata( - RemoteFile.DELETE_AFTER_DOWNLOAD) - .equals("true")); - dtEval.fileDownloadComplete(newFile); - threadController - .setCorePoolSize(max_sessions = dtEval - .getRecommendedThreadCount()); - threadController.setMaximumPoolSize(max_sessions); - // if static number of threads are to be used - } else { - protocolHandler.download(curSession, remoteFile - .getProtocolFile(), newFile, remoteFile - .getMetadata( - RemoteFile.DELETE_AFTER_DOWNLOAD) - .equals("true")); - } - - successful = true; - if (FileRetrievalSystem.this.dListener != null) { - FileRetrievalSystem.this.dListener - .downloadFinished(remoteFile - .getProtocolFile()); - } - - remoteFile.addMetadata(RemoteFile.FILE_SIZE, newFile - .length() - + ""); - - // try to create the metadata file - if (config.getWriteMetFile()) { - try { - LOG.log(Level.INFO, "Writing metadata file for '" + newFile + "'"); - remoteFile.addMetadata(RemoteFile.FILE_SIZE, - newFile.length() + ""); - remoteFile.writeToPropEqValFile(newFile - .getAbsolutePath() - + "." + config.getMetFileExtension(), - config.getListOfMetadataToOutput()); - } catch (Exception e) { - LOG.log(Level.SEVERE, - "Failed to create metadata file for " - + remoteFile.getProtocolFile()); - } - } - - } catch (Exception e) { - - // if tracker is being used cancel tracking - if (config.useTracker()) { - dtEval.cancelRuntimeTracking(newFile); - } - - // delete any created file from staging area - newFile.delete(); - new File(newFile.getAbsolutePath() + "." - + config.getMetFileExtension()).delete(); - - // check if a retry is still allowed - if (++retries > MAX_RETRIES) { - FileRetrievalSystem.this.failedDownloadList - .add(remoteFile.getProtocolFile()); - LOG.log(Level.SEVERE, "Failed to download " - + remoteFile.getProtocolFile() + " : " - + e.getMessage()); - if (FileRetrievalSystem.this.dListener != null) { - FileRetrievalSystem.this.dListener - .downloadFailed(remoteFile - .getProtocolFile(), e - .getMessage()); - } - break; - } else if (FileRetrievalSystem.this.failedDownloadList - .size() < max_allowed_failed_downloads) { - // discard current session and recreate a new - // session to try to download file with - LOG.log(Level.WARNING, "Retrying to download file " - + remoteFile.getProtocolFile() - + " because download failed : " - + e.getMessage(), e); - try { - protocolHandler.disconnect(curSession); - } catch (Exception ignored) { - } - try { - curSession = protocolHandler - .getAppropriateProtocol(remoteFile - .getProtocolFile(), false, true); - } catch (Exception exc) { - LOG.log(Level.SEVERE, - "Failed to reconnect protocol to retry download of file " - + remoteFile.getProtocolFile() - + " -- aborting retry : " - + e.getMessage(), e); - } - } else { - LOG - .log( - Level.SEVERE, - "Terminating download tries for file " - + remoteFile - .getProtocolFile() - + " do to too many previous download failures : " - + e.getMessage(), e); - if (FileRetrievalSystem.this.dListener != null) { - FileRetrievalSystem.this.dListener - .downloadFailed(remoteFile - .getProtocolFile(), e - .getMessage()); - } - break; - } - - } - } while (!successful); - - FileRetrievalSystem.this.removeFromDownloadingList(remoteFile - .getProtocolFile()); - determineSessionFate(curSession); - } - }); - } - - private synchronized void addToDownloadingList(ProtocolFile pFile) { - this.currentlyDownloading.add(pFile); - } - - private synchronized void removeFromDownloadingList(ProtocolFile pFile) { - this.currentlyDownloading.remove(pFile); - } - - public synchronized boolean isDownloading(ProtocolFile pFile) { - return this.currentlyDownloading.contains(pFile); - } - - public synchronized LinkedList<ProtocolFile> getCurrentlyDownloadingFiles() { - LinkedList<ProtocolFile> list = new LinkedList<ProtocolFile>(); - list.addAll(this.currentlyDownloading); - return list; - } - - public LinkedList<ProtocolFile> getListOfFailedDownloads() { - return this.failedDownloadList; - } - - public void clearFailedDownloadsList() { - this.failedDownloadList.clear(); - } - - synchronized void determineSessionFate(Protocol session) { - // determine whether thread should be keep or should be thrown away - if (numberOfSessions <= max_sessions) { - giveBackSession(session); - } else { - disposeOfSession(session); - } - } - - void giveBackSession(Protocol session) { - addAvailableSession(session); - wakeUpMainThread(); - } - - void disposeOfSession(Protocol session) { - try { - protocolHandler.disconnect(session); - } catch (Exception e) { - // log failure - } - numberOfSessions--; - } - - public void shutdown() { - try { - // close out threadpool - threadController.shutdown(); - // give a max of 10 minutes to finish downloading any files - threadController.awaitTermination(TIMEOUT2, TimeUnit.SECONDS); - } catch (Exception e) { - // log failure - } - - try { - this.resetVariables(); - } catch (Exception ignored) { - - } - - try { - closeSessions(); - } catch (Exception e) { - // log failure!!! - } - - try { - protocolHandler.close(); - } catch (Exception e) { - // log failure!!! - } - - } - - /** - * Disconnects all downloading Protocol sessions in the avaiableSessions - * list. The ThreadPoolExecutor needs to be completely shutdown before this - * method should be called. Otherwise some Protocols might not be - * disconnected or left downloading. - * - * @return True if successful, false otherwise - * @throws RemoteConnectionException - */ - public boolean closeSessions() throws RemoteConnectionException { - for (Protocol session : avaliableSessions) { - protocolHandler.disconnect(session); - } - avaliableSessions.clear(); - numberOfSessions = 0; - return true; - } -} http://git-wip-us.apache.org/repos/asf/oodt/blob/098cc4fa/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/RemoteFile.java ---------------------------------------------------------------------- diff --git a/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/RemoteFile.java b/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/RemoteFile.java deleted file mode 100644 index 68b0dcb..0000000 --- a/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/RemoteFile.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.oodt.cas.pushpull.retrievalsystem; - -//OODT imports -import org.apache.oodt.cas.metadata.Metadata; -import org.apache.oodt.cas.metadata.SerializableMetadata; -import org.apache.oodt.cas.pushpull.protocol.RemoteSiteFile; - -//JDK imports -import java.io.FileOutputStream; -import java.io.IOException; - -/** - * @author bfoster - * @author mattmann - * @version $Revision$ - * - * <p> - * Describe your class here - * </p>. - */ -public class RemoteFile implements RemoteFileMetKeys { - - private Metadata metadata; - - private RemoteSiteFile pFile; - - public RemoteFile(RemoteSiteFile pFile) { - this.pFile = pFile; - this.metadata = new Metadata(); - this.metadata.addMetadata(RETRIEVED_FROM_LOC, pFile.getPath()); - this.metadata.addMetadata(FILENAME, pFile.getName()); - this.metadata.addMetadata(DATA_PROVIDER, pFile.getSite().getURL().getHost()); - } - - public void setUniqueMetadataElement(String uniqueMetadataElement) { - this.metadata.addMetadata(PRODUCT_NAME, this - .getMetadata(uniqueMetadataElement)); - } - - public void addMetadata(String key, String value) { - this.metadata.addMetadata(key, value); - } - - public void addMetadata(Metadata metadata) { - this.metadata.addMetadata(metadata); - } - - public String getMetadata(String key) { - return this.metadata.getMetadata(key); - } - - public Metadata getAllMetadata() { - return this.metadata; - } - - public RemoteSiteFile getProtocolFile() { - return this.pFile; - } - - public void writeToPropEqValFile(String filePath, - String[] metadataToWriteOut) throws IOException { - try { - SerializableMetadata sMetadata = new SerializableMetadata("UTF-8", - false); - for (String metadataKey : metadataToWriteOut) { - if (this.metadata.getMetadata(metadataKey) != null - && !this.metadata.getMetadata(metadataKey).equals("")) { - sMetadata.addMetadata(metadataKey, this.metadata - .getMetadata(metadataKey)); - } - } - sMetadata.writeMetadataToXmlStream(new FileOutputStream(filePath)); - } catch (Exception e) { - throw new IOException("Failed to write metadata file for " - + this.pFile + " : " + e.getMessage()); - } - } - -} http://git-wip-us.apache.org/repos/asf/oodt/blob/098cc4fa/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/RemoteFileMetKeys.java ---------------------------------------------------------------------- diff --git a/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/RemoteFileMetKeys.java b/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/RemoteFileMetKeys.java deleted file mode 100644 index 1ad95e8..0000000 --- a/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/RemoteFileMetKeys.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.oodt.cas.pushpull.retrievalsystem; - -/** - * @author mattmann - * @version $Revision$ - * - * <p> - * Met keys needed by the {@link RemoteFile} - * </p>. - */ -public interface RemoteFileMetKeys { - - String PRODUCT_NAME = "ProductName"; - - String RETRIEVED_FROM_LOC = "RetrievedFromLoc"; - - String FILENAME = "Filename"; - - String DATA_PROVIDER = "DataProvider"; - - String FILE_SIZE = "FileSize"; - - String RENAMING_STRING = "RenamingString"; - - String DOWNLOAD_TO_DIR = "DownloadToDir"; - - String PRODUCT_TYPE = "ProductType"; - - String MIME_TYPE = "MimeType"; - - String SUPER_TYPE = "SuperType"; - - String DELETE_AFTER_DOWNLOAD = "DeleteAfterDownload"; - - String PRODUCT_NAME_GENERATOR = "ProductNameGenerator"; - - String UNIQUE_ELEMENT = "UniqueElement"; - -} http://git-wip-us.apache.org/repos/asf/oodt/blob/098cc4fa/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/RetrievalSetup.java ---------------------------------------------------------------------- diff --git a/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/RetrievalSetup.java b/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/RetrievalSetup.java deleted file mode 100644 index 3113620..0000000 --- a/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/RetrievalSetup.java +++ /dev/null @@ -1,267 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.oodt.cas.pushpull.retrievalsystem; - -//OODT imports -import org.apache.oodt.cas.pushpull.config.*; -import org.apache.oodt.cas.pushpull.exceptions.ParserException; -import org.apache.oodt.cas.pushpull.filerestrictions.Parser; -import org.apache.oodt.cas.pushpull.objectfactory.PushPullObjectFactory; -import org.apache.oodt.cas.pushpull.retrievalmethod.RetrievalMethod; - -//JDK imports -import java.io.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.logging.Level; -import java.util.logging.Logger; - - - -/** - * - * @author bfoster - * @version $Revision$ - * - * <p> - * Describe your class here - * </p>. - */ -public class RetrievalSetup { - - public static final int TIMEOUT = 5000; - private final Config config; - - private final HashSet<File> alreadyProcessedPropFiles; - - private final ConcurrentHashMap<Class<RetrievalMethod>, RetrievalMethod> classToRmMap; - - private boolean downloadingProps; - - private final SiteInfo siteInfo; - - private final DataFileToPropFileLinker linker; - - private final static Logger LOG = Logger.getLogger(RetrievalSetup.class - .getName()); - - public RetrievalSetup(Config config, SiteInfo siteInfo) { - this.downloadingProps = false; - this.config = config; - this.siteInfo = siteInfo; - alreadyProcessedPropFiles = new HashSet<File>(); - classToRmMap = new ConcurrentHashMap<Class<RetrievalMethod>, RetrievalMethod>(); - linker = new DataFileToPropFileLinker(); - } - - public void retrieveFiles(PropFilesInfo pfi, final DataFilesInfo dfi) { - - FileRetrievalSystem dataFilesFRS = null; - try { - this.startPropFileDownload(pfi); - - (dataFilesFRS = new FileRetrievalSystem(config, siteInfo)) - .initialize(); - dataFilesFRS.registerDownloadListener(linker); - - File[] propFiles; - while ((propFiles = getCurrentlyDownloadedPropFiles(pfi)).length > 0 - || downloadingProps) { - for (File propFile : propFiles) { - try { - Parser parser = pfi.getParserForFile(propFile); - Class<RetrievalMethod> rmClass = config.getParserInfo() - .getRetrievalMethod(parser); - RetrievalMethod rm; - if ((rm = this.classToRmMap.get(rmClass)) == null) { - LOG.log(Level.INFO, "Creating '" - + rmClass.getCanonicalName() - + "' to download data files"); - rm = PushPullObjectFactory - .createNewInstance(rmClass); - this.classToRmMap.put(rmClass, rm); - } - rm.processPropFile(dataFilesFRS, parser, propFile, dfi, - linker); - } catch (ParserException e) { - LOG.log(Level.SEVERE, "Failed to parse property file " - + propFile + " : " + e.getMessage(), e); - linker.markAsFailed(propFile, - "Failed to parse property file " + propFile - + " : " + e.getMessage()); - } catch (Exception e) { - LOG.log(Level.SEVERE, - "Failed to finish downloading per property files " - + propFile.getAbsolutePath() + " : " - + e.getMessage(), e); - linker.markAsFailed(propFile, - "Error while downloading per property file " - + propFile.getAbsolutePath() + " : " - + e.getMessage()); - } - } - - dataFilesFRS.waitUntilAllCurrentDownloadsAreComplete(); - - for (File propFile : propFiles) { - try { - if (pfi.getLocalDir().equals(pfi.getOnSuccessDir()) - || pfi.getLocalDir().equals(pfi.getOnFailDir())) { - alreadyProcessedPropFiles.add(propFile); - } - this.movePropsFileToFinalDestination(pfi, propFile, - linker.getErrorsAndEraseLinks(propFile)); - } catch (Exception e) { - LOG.log(Level.SEVERE, e.getMessage()); - LOG.log(Level.SEVERE, - "Error occurred while writing errors to error dir for file '" - + propFile + "' : " + e.getMessage()); - } - } - } - - } catch (Exception e) { - LOG.log(Level.SEVERE, e.getMessage()); - } finally { - if (dataFilesFRS != null) { - dataFilesFRS.shutdown(); - } - alreadyProcessedPropFiles.clear(); - linker.clear(); - } - } - - private void startPropFileDownload(final PropFilesInfo pfi) { - if (pfi.needsToBeDownloaded()) { - this.downloadingProps = true; - new Thread(new Runnable() { - - @Override - public void run() { - FileRetrievalSystem frs = null; - try { - (frs = new FileRetrievalSystem( - RetrievalSetup.this - .createPropFilesConfig(config - .getProtocolInfo()), siteInfo)) - .initialize(); - - LinkedList<File> propDirStructFiles = pfi - .getDownloadInfoPropFiles(); - for (File dirStructFile : propDirStructFiles) { - Parser parser = pfi.getParserForFile(dirStructFile); - Class<RetrievalMethod> rmClass = config - .getParserInfo().getRetrievalMethod(parser); - RetrievalMethod rm; - if ((rm = RetrievalSetup.this.classToRmMap - .get(rmClass)) == null) { - LOG.log(Level.INFO, "Creating '" - + rmClass.getCanonicalName() - + "' to download property files"); - rm = PushPullObjectFactory - .createNewInstance(rmClass); - RetrievalSetup.this.classToRmMap.put(rmClass, - rm); - } - rm.processPropFile(frs, parser, dirStructFile, - new DataFilesInfo(null, pfi - .getDownloadInfo()), linker); - } - } catch (Exception e) { - LOG.log(Level.SEVERE, e.getMessage()); - } finally { - if (frs != null) { - frs.shutdown(); - } - RetrievalSetup.this.downloadingProps = false; - } - } - - }).start(); - - // give property file download a 5 sec head start - LOG - .log( - Level.INFO, - "Waiting data download thread for 5 secs to give the property files download thread a head start"); - synchronized (this) { - try { - this.wait(TIMEOUT); - } catch (Exception ignored) { - } - } - } - } - - private File[] getCurrentlyDownloadedPropFiles(final PropFilesInfo pfi) { - File[] files = pfi.getLocalDir().listFiles(new FileFilter() { - @Override - public boolean accept(File pathname) { - return pfi.getParserForFile(pathname) != null - && !(pathname.getName().startsWith("Downloading_") - || pathname.getName().endsWith( - RetrievalSetup.this.config - .getMetFileExtension()) || alreadyProcessedPropFiles - .contains(pathname)); - } - }); - return files == null ? new File[0] : files; - } - - private Config createPropFilesConfig(ProtocolInfo pi) { - Config propConfig = this.config.clone(); - ProtocolInfo propPI = pi.clone(); - propPI.setPageSize(-1); - propConfig.setProtocolInfo(propPI); - propConfig.setUseTracker(false); - propConfig.setIngester(null); - propConfig.setOnlyDownloadDefinedTypes(false); - return propConfig; - } - - private void movePropsFileToFinalDestination(PropFilesInfo pfi, - File dirstructFile, String errorMsgs) throws IOException { - File metFile = new File( - String.format("%s.%s", dirstructFile.getAbsolutePath(), config.getMetFileExtension())); - if (pfi.getDeleteOnSuccess() && errorMsgs == null) { - dirstructFile.delete(); - metFile.delete(); - return; - } - File moveToDir = pfi.getFinalDestination(errorMsgs == null); - moveToDir.mkdirs(); - File newLoc = new File(moveToDir, dirstructFile.getName()); - dirstructFile.renameTo(newLoc); - metFile.renameTo(new File( - String.format("%s.%s", newLoc.getAbsolutePath(), config.getMetFileExtension()))); - if (errorMsgs != null) { - File errorFile = new File(newLoc.getParentFile(), dirstructFile - .getName() - + ".errors"); - errorFile.createNewFile(); - - PrintStream ps = new PrintStream(new FileOutputStream(errorFile)); - ps.print(errorMsgs); - ps.println(); - ps.close(); - } - } - -} http://git-wip-us.apache.org/repos/asf/oodt/blob/098cc4fa/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/TimeAndThreadCount.java ---------------------------------------------------------------------- diff --git a/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/TimeAndThreadCount.java b/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/TimeAndThreadCount.java deleted file mode 100644 index f8f3985..0000000 --- a/pushpull/src/main/java/org/apache/oodt/cas/pushpull/retrievalsystem/TimeAndThreadCount.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.oodt.cas.pushpull.retrievalsystem; - -/** - * @author bfoster - * @version $Revision$ - * - * <p> - * Describe your class here - * </p>. - */ -public class TimeAndThreadCount { - - private long startTimeInMillis; - - private int threadCount; - - public TimeAndThreadCount(long startTimeInMillis, int threadCount) { - this.startTimeInMillis = startTimeInMillis; - this.threadCount = threadCount; - } - - public long getStartTimeInMillis() { - return this.startTimeInMillis; - } - - public int getThreadCount() { - return this.threadCount; - } - -} http://git-wip-us.apache.org/repos/asf/oodt/blob/098cc4fa/pushpull/src/main/java/org/apache/oodt/cas/pushpull/url/handlers/imaps/Handler.java ---------------------------------------------------------------------- diff --git a/pushpull/src/main/java/org/apache/oodt/cas/pushpull/url/handlers/imaps/Handler.java b/pushpull/src/main/java/org/apache/oodt/cas/pushpull/url/handlers/imaps/Handler.java deleted file mode 100644 index c5ed8c2..0000000 --- a/pushpull/src/main/java/org/apache/oodt/cas/pushpull/url/handlers/imaps/Handler.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.oodt.cas.pushpull.url.handlers.imaps; - -//JDK imports -import java.io.IOException; -import java.net.URL; -import java.net.URLConnection; -import java.net.URLStreamHandler; - -/** - * - * @author bfoster - * @version $Revision$ - * - * <p>Describe your class here</p>. - */ -public class Handler extends URLStreamHandler { - - @Override - protected URLConnection openConnection(URL url) throws IOException { - return new ImapsURLConnection(url); - } - -} http://git-wip-us.apache.org/repos/asf/oodt/blob/098cc4fa/pushpull/src/main/java/org/apache/oodt/cas/pushpull/url/handlers/imaps/ImapsURLConnection.java ---------------------------------------------------------------------- diff --git a/pushpull/src/main/java/org/apache/oodt/cas/pushpull/url/handlers/imaps/ImapsURLConnection.java b/pushpull/src/main/java/org/apache/oodt/cas/pushpull/url/handlers/imaps/ImapsURLConnection.java deleted file mode 100644 index 6e1d792..0000000 --- a/pushpull/src/main/java/org/apache/oodt/cas/pushpull/url/handlers/imaps/ImapsURLConnection.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.oodt.cas.pushpull.url.handlers.imaps; - -//JDK imports -import java.io.IOException; -import java.net.URL; -import java.net.URLConnection; - -/** - * - * @author bfoster - * @version $Revision$ - * - * <p> - * Describe your class here - * </p>. - */ -public class ImapsURLConnection extends URLConnection { - - protected ImapsURLConnection(URL url) { - super(url); - } - - @Override - public void connect() throws IOException { - // do nothing - } - -} http://git-wip-us.apache.org/repos/asf/oodt/blob/098cc4fa/pushpull/src/main/java/org/apache/oodt/cas/pushpull/url/handlers/sftp/Handler.java ---------------------------------------------------------------------- diff --git a/pushpull/src/main/java/org/apache/oodt/cas/pushpull/url/handlers/sftp/Handler.java b/pushpull/src/main/java/org/apache/oodt/cas/pushpull/url/handlers/sftp/Handler.java deleted file mode 100644 index e80a030..0000000 --- a/pushpull/src/main/java/org/apache/oodt/cas/pushpull/url/handlers/sftp/Handler.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.oodt.cas.pushpull.url.handlers.sftp; - -//JDK imports -import java.io.IOException; -import java.net.URL; -import java.net.URLConnection; -import java.net.URLStreamHandler; - -/** - * - * @author bfoster - * @version $Revision$ - * - * <p>Describe your class here</p>. - */ -public class Handler extends URLStreamHandler { - - @Override - protected URLConnection openConnection(URL url) throws IOException { - return new SftpURLConnection(url); - } - -} http://git-wip-us.apache.org/repos/asf/oodt/blob/098cc4fa/pushpull/src/main/java/org/apache/oodt/cas/pushpull/url/handlers/sftp/SftpURLConnection.java ---------------------------------------------------------------------- diff --git a/pushpull/src/main/java/org/apache/oodt/cas/pushpull/url/handlers/sftp/SftpURLConnection.java b/pushpull/src/main/java/org/apache/oodt/cas/pushpull/url/handlers/sftp/SftpURLConnection.java deleted file mode 100644 index ff63cdc..0000000 --- a/pushpull/src/main/java/org/apache/oodt/cas/pushpull/url/handlers/sftp/SftpURLConnection.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.oodt.cas.pushpull.url.handlers.sftp; - -//JDK imports -import java.io.IOException; -import java.net.URL; -import java.net.URLConnection; - -/** - * - * @author bfoster - * @version $Revision$ - * - * <p>Describe your class here</p>. - */ -public class SftpURLConnection extends URLConnection { - - protected SftpURLConnection(URL url) { - super(url); - } - - @Override - public void connect() throws IOException { - // do nothing!!! - } - -}
