split block if it contains more than one item with the tag we want
Project: http://git-wip-us.apache.org/repos/asf/vxquery/repo Commit: http://git-wip-us.apache.org/repos/asf/vxquery/commit/61fec3b6 Tree: http://git-wip-us.apache.org/repos/asf/vxquery/tree/61fec3b6 Diff: http://git-wip-us.apache.org/repos/asf/vxquery/diff/61fec3b6 Branch: refs/heads/steven/hdfs Commit: 61fec3b6759f01c3e8d5ace985228e81787af42a Parents: 924a01b Author: efikalti <[email protected]> Authored: Thu Jul 30 11:06:00 2015 +0300 Committer: efikalti <[email protected]> Committed: Thu Jul 30 11:06:00 2015 +0300 ---------------------------------------------------------------------- .../VXQueryCollectionOperatorDescriptor.java | 48 ++++++++++++-------- 1 file changed, 29 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/vxquery/blob/61fec3b6/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java index b7627d0..a930457 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java @@ -32,6 +32,7 @@ import java.util.logging.Logger; import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.TrueFileFilter; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; @@ -71,6 +72,7 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO protected static final Logger LOGGER = Logger.getLogger(VXQueryCollectionOperatorDescriptor.class.getName()); private HDFSFunctions hdfs; private String tag; + private final String START_TAG = "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n"; public VXQueryCollectionOperatorDescriptor(IOperatorDescriptorRegistry spec, VXQueryCollectionDataSource ds, RecordDescriptor rDesc) { @@ -141,14 +143,8 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO Path directory = new Path(collectionModifiedName); Path xmlDocument; if (tag != null) { - String tags[] = tag.split("/"); - String start_tag = "<?xml version=\"1.0\" encoding=\"utf-8\"?>"; - String end_tag = ""; - for (int i = 0; i < tags.length - 1; i++) { - start_tag = start_tag + "<" + tags[i] + ">"; - end_tag = end_tag + "</" + tags[i] + ">"; - } - hdfs.setJob(directory.getName(), tags[tags.length - 1]); + hdfs.setJob(directory.getName(), tag); + tag = "<" + tag + ">"; Job job = hdfs.getJob(); InputFormat inputFormat = hdfs.getinputFormat(); try { @@ -164,22 +160,36 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO List<FileSplit> inputSplits = splitsFactory.getSplits(); ContextFactory ctxFactory = new ContextFactory(); int size = inputSplits.size(); + InputStream stream; + String value; + RecordReader reader; + TaskAttemptContext context; for (int i = 0; i < size; i++) { //read split - TaskAttemptContext context = ctxFactory.createContext(job.getConfiguration(), i); - RecordReader reader; + context = ctxFactory.createContext(job.getConfiguration(), i); try { reader = inputFormat.createRecordReader(inputSplits.get(i), context); reader.initialize(inputSplits.get(i), context); - while (reader.nextKeyValue() == true) { - String value = reader.getCurrentValue().toString(); - String xml = start_tag; - xml = xml + value + end_tag; - System.out.println(xml); - //create an input stream to the file currently reading and send it to parser - InputStream stream = new ByteArrayInputStream( - xml.getBytes(StandardCharsets.UTF_8)); - parser.parseHDFSElements(stream, writer, fta, i); + while (reader.nextKeyValue()) { + value = reader.getCurrentValue().toString(); + //Split value if it contains more than one item with the tag + if (StringUtils.countMatches(value, tag) > 1) { + String items[] = value.split(tag); + for (String item : items) { + if (item.length() > 0) { + item = START_TAG + tag + item; + stream = new ByteArrayInputStream( + item.getBytes(StandardCharsets.UTF_8)); + parser.parseHDFSElements(stream, writer, fta, i); + } + } + } else { + value = START_TAG + value; + //create an input stream to the file currently reading and send it to parser + stream = new ByteArrayInputStream( + value.getBytes(StandardCharsets.UTF_8)); + parser.parseHDFSElements(stream, writer, fta, i); + } } } catch (InterruptedException e) {
