check for filesystem
Project: http://git-wip-us.apache.org/repos/asf/vxquery/repo Commit: http://git-wip-us.apache.org/repos/asf/vxquery/commit/e7eabe3d Tree: http://git-wip-us.apache.org/repos/asf/vxquery/tree/e7eabe3d Diff: http://git-wip-us.apache.org/repos/asf/vxquery/diff/e7eabe3d Branch: refs/heads/steven/hdfs Commit: e7eabe3d84cc3efdd998c44885cc7b3f96d826fa Parents: 89c4ac7 Author: efikalti <[email protected]> Authored: Wed Aug 19 18:38:14 2015 +0300 Committer: efikalti <[email protected]> Committed: Wed Aug 19 18:38:14 2015 +0300 ---------------------------------------------------------------------- .../VXQueryCollectionOperatorDescriptor.java | 39 +++++++++++--------- 1 file changed, 21 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/vxquery/blob/e7eabe3d/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java index a930457..9bdf34c 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java @@ -114,28 +114,31 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO public void nextFrame(ByteBuffer buffer) throws HyracksDataException { fta.reset(buffer); String collectionModifiedName = collectionName.replace("${nodeId}", nodeId); - - File collectionDirectory = new File(collectionModifiedName); - //check if directory is in the local file system - if (collectionDirectory.exists()) { - // Go through each tuple. - if (collectionDirectory.isDirectory()) { - for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) { - Iterator<File> it = FileUtils.iterateFiles(collectionDirectory, new VXQueryIOFileFilter(), - TrueFileFilter.INSTANCE); - while (it.hasNext()) { - File xmlDocument = it.next(); - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("Starting to read XML document: " + xmlDocument.getAbsolutePath()); + if (!collectionModifiedName.contains("hdfs:/")) { + File collectionDirectory = new File(collectionModifiedName); + //check if directory is in the local file system + if (collectionDirectory.exists()) { + System.out.println("local"); + // Go through each tuple. + if (collectionDirectory.isDirectory()) { + for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) { + Iterator<File> it = FileUtils.iterateFiles(collectionDirectory, + new VXQueryIOFileFilter(), TrueFileFilter.INSTANCE); + while (it.hasNext()) { + File xmlDocument = it.next(); + if (LOGGER.isLoggable(Level.FINE)) { + LOGGER.fine("Starting to read XML document: " + xmlDocument.getAbsolutePath()); + } + parser.parseElements(xmlDocument, writer, fta, tupleIndex); } - parser.parseElements(xmlDocument, writer, fta, tupleIndex); } + } else { + throw new HyracksDataException("Invalid directory parameter (" + nodeId + ":" + + collectionDirectory.getAbsolutePath() + ") passed to collection."); } - } else { - throw new HyracksDataException("Invalid directory parameter (" + nodeId + ":" - + collectionDirectory.getAbsolutePath() + ") passed to collection."); } } else { + collectionModifiedName = collectionModifiedName.replaceAll("hdfs:/", ""); // Else check in HDFS file system // Get instance of the HDFS filesystem FileSystem fs = hdfs.getFileSystem(); @@ -222,7 +225,7 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO } } else { throw new HyracksDataException("Invalid HDFS directory parameter (" + nodeId + ":" - + collectionDirectory + ") passed to collection."); + + directory + ") passed to collection."); } } catch (FileNotFoundException e) { System.err.println(e);
