http://git-wip-us.apache.org/repos/asf/jena/blob/3d70d735/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/WholeFileNQuadsInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/WholeFileNQuadsInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/WholeFileNQuadsInputFormat.java index 086ddba..9d5966f 100644 --- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/WholeFileNQuadsInputFormat.java +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/WholeFileNQuadsInputFormat.java @@ -1,30 +1,30 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.jena.hadoop.rdf.io.input.nquads; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat; -import org.apache.jena.hadoop.rdf.io.input.readers.nquads.WholeFileNQuadsReader; -import org.apache.jena.hadoop.rdf.types.QuadWritable; +import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat; +import org.apache.jena.hadoop.rdf.io.input.readers.nquads.WholeFileNQuadsReader; +import org.apache.jena.hadoop.rdf.types.QuadWritable; /**
http://git-wip-us.apache.org/repos/asf/jena/blob/3d70d735/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/BlockedNTriplesInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/BlockedNTriplesInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/BlockedNTriplesInputFormat.java index 81cc6a2..3536f5d 100644 --- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/BlockedNTriplesInputFormat.java +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/BlockedNTriplesInputFormat.java @@ -1,30 +1,30 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.jena.hadoop.rdf.io.input.ntriples; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.jena.hadoop.rdf.io.input.AbstractNLineFileInputFormat; -import org.apache.jena.hadoop.rdf.io.input.readers.ntriples.BlockedNTriplesReader; -import org.apache.jena.hadoop.rdf.types.TripleWritable; +import org.apache.jena.hadoop.rdf.io.input.AbstractNLineFileInputFormat; +import org.apache.jena.hadoop.rdf.io.input.readers.ntriples.BlockedNTriplesReader; +import org.apache.jena.hadoop.rdf.types.TripleWritable; /** http://git-wip-us.apache.org/repos/asf/jena/blob/3d70d735/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/NTriplesInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/NTriplesInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/NTriplesInputFormat.java index 5bfa04c..bbe4c9a 100644 --- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/NTriplesInputFormat.java +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/NTriplesInputFormat.java @@ -1,31 +1,31 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.jena.hadoop.rdf.io.input.ntriples; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.jena.hadoop.rdf.io.input.AbstractNLineFileInputFormat; -import org.apache.jena.hadoop.rdf.io.input.readers.ntriples.NTriplesReader; -import org.apache.jena.hadoop.rdf.types.TripleWritable; - +import org.apache.jena.hadoop.rdf.io.input.AbstractNLineFileInputFormat; +import org.apache.jena.hadoop.rdf.io.input.readers.ntriples.NTriplesReader; +import org.apache.jena.hadoop.rdf.types.TripleWritable; + /** * NTriples input format http://git-wip-us.apache.org/repos/asf/jena/blob/3d70d735/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/WholeFileNTriplesInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/WholeFileNTriplesInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/WholeFileNTriplesInputFormat.java index 0b7db0a..5a06789 100644 --- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/WholeFileNTriplesInputFormat.java +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/WholeFileNTriplesInputFormat.java @@ -1,30 +1,30 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.jena.hadoop.rdf.io.input.ntriples; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat; -import org.apache.jena.hadoop.rdf.io.input.readers.ntriples.WholeFileNTriplesReader; -import org.apache.jena.hadoop.rdf.types.TripleWritable; +import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat; +import org.apache.jena.hadoop.rdf.io.input.readers.ntriples.WholeFileNTriplesReader; +import org.apache.jena.hadoop.rdf.types.TripleWritable; /** http://git-wip-us.apache.org/repos/asf/jena/blob/3d70d735/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfjson/RdfJsonInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfjson/RdfJsonInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfjson/RdfJsonInputFormat.java index 66b1833..29d0f6f 100644 --- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfjson/RdfJsonInputFormat.java +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfjson/RdfJsonInputFormat.java @@ -1,30 +1,30 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.jena.hadoop.rdf.io.input.rdfjson; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat; -import org.apache.jena.hadoop.rdf.io.input.readers.rdfjson.RdfJsonReader; -import org.apache.jena.hadoop.rdf.types.TripleWritable; +import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat; +import org.apache.jena.hadoop.rdf.io.input.readers.rdfjson.RdfJsonReader; +import org.apache.jena.hadoop.rdf.types.TripleWritable; /** http://git-wip-us.apache.org/repos/asf/jena/blob/3d70d735/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfxml/RdfXmlInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfxml/RdfXmlInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfxml/RdfXmlInputFormat.java index 0a2b25b..f231073 100644 --- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfxml/RdfXmlInputFormat.java +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfxml/RdfXmlInputFormat.java @@ -1,30 +1,30 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.jena.hadoop.rdf.io.input.rdfxml; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat; -import org.apache.jena.hadoop.rdf.io.input.readers.rdfxml.RdfXmlReader; -import org.apache.jena.hadoop.rdf.types.TripleWritable; +import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat; +import org.apache.jena.hadoop.rdf.io.input.readers.rdfxml.RdfXmlReader; +import org.apache.jena.hadoop.rdf.types.TripleWritable; /** http://git-wip-us.apache.org/repos/asf/jena/blob/3d70d735/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedNodeTupleReader.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedNodeTupleReader.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedNodeTupleReader.java index 33d2baf..cf5e621 100644 --- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedNodeTupleReader.java +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedNodeTupleReader.java @@ -1,344 +1,344 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.jena.hadoop.rdf.io.input.readers; - -import java.io.IOException; -import java.io.InputStream; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.CompressionCodecFactory; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; -import org.apache.jena.hadoop.rdf.io.RdfIOConstants; -import org.apache.jena.hadoop.rdf.io.input.util.BlockInputStream; -import org.apache.jena.hadoop.rdf.io.input.util.RdfIOUtils; -import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream; -import org.apache.jena.hadoop.rdf.io.input.util.TrackedInputStream; -import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream; -import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable; -import org.apache.jena.riot.Lang; -import org.apache.jena.riot.RDFDataMgr; -import org.apache.jena.riot.ReaderRIOT; -import org.apache.jena.riot.lang.PipedRDFIterator; -import org.apache.jena.riot.lang.PipedRDFStream; -import org.apache.jena.riot.system.ParserProfile; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * An abstract implementation for a record reader that reads records from blocks - * of files, this is a hybrid between {@link AbstractLineBasedNodeTupleReader} - * and {@link AbstractWholeFileNodeTupleReader} in that it can only be used by - * formats which can be split by lines but reduces the overhead by parsing the - * split as a whole rather than as individual lines. - * <p> - * The keys produced are the approximate position in the file at which a tuple - * was found and the values will be node tuples. Positions are approximate - * because they are recorded after the point at which the most recent tuple was - * parsed from the input thus they reflect the approximate position in the - * stream immediately after which the triple was found. - * </p> - * - * - * - * @param <TValue> - * Value type - * @param <T> - * Tuple type - */ -public abstract class AbstractBlockBasedNodeTupleReader<TValue, T extends AbstractNodeTupleWritable<TValue>> extends RecordReader<LongWritable, T> { - - private static final Logger LOG = LoggerFactory.getLogger(AbstractBlockBasedNodeTupleReader.class); - private CompressionCodec compressionCodecs; - private TrackableInputStream input; - private LongWritable key; - private long start, length; - private T tuple; - private TrackedPipedRDFStream<TValue> stream; - private PipedRDFIterator<TValue> iter; - private Thread parserThread; - private boolean finished = false; - private boolean ignoreBadTuples = true; - private boolean parserFinished = false; - private Throwable parserError = null; - - @Override - public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { - LOG.debug("initialize({}, {})", genericSplit, context); - - // Assuming file split - if (!(genericSplit instanceof FileSplit)) - throw new IOException("This record reader only supports FileSplit inputs"); - FileSplit split = (FileSplit) genericSplit; - - // Configuration - Configuration config = context.getConfiguration(); - this.ignoreBadTuples = config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true); - if (this.ignoreBadTuples) - LOG.warn( - "Configured to ignore bad tuples, parsing errors will be logged and further parsing aborted but no user visible errors will be thrown. Consider setting {} to false to disable this behaviour", - RdfIOConstants.INPUT_IGNORE_BAD_TUPLES); - - // Figure out what portion of the file to read - start = split.getStart(); - long end = start + split.getLength(); - final Path file = split.getPath(); - long totalLength = file.getFileSystem(context.getConfiguration()).getFileStatus(file).getLen(); - boolean readToEnd = end == totalLength; - CompressionCodecFactory factory = new CompressionCodecFactory(config); - this.compressionCodecs = factory.getCodec(file); - - LOG.info(String.format("Got split with start %d and length %d for file with total length of %d", new Object[] { start, split.getLength(), totalLength })); - - // Open the file and prepare the input stream - FileSystem fs = file.getFileSystem(config); - FSDataInputStream fileIn = fs.open(file); - this.length = split.getLength(); - if (start > 0) - fileIn.seek(start); - - if (this.compressionCodecs != null) { - // Compressed input - // For compressed input NLineInputFormat will have failed to find - // any line breaks and will give us a split from 0 -> (length - 1) - // Add 1 and re-verify readToEnd so we can abort correctly if ever - // given a partial split of a compressed file - end++; - readToEnd = end == totalLength; - if (start > 0 || !readToEnd) - throw new IOException("This record reader can only be used with compressed input where the split is a whole file"); - input = new TrackedInputStream(this.compressionCodecs.createInputStream(fileIn)); - } else { - // Uncompressed input - - if (readToEnd) { - input = new TrackedInputStream(fileIn); - } else { - // Need to limit the portion of the file we are reading - input = new BlockInputStream(fileIn, split.getLength()); - } - } - - // Set up background thread for parser - iter = this.getPipedIterator(); - this.stream = this.getPipedStream(iter, this.input); - ParserProfile profile = RdfIOUtils.createParserProfile(context, file); - Runnable parserRunnable = this.createRunnable(this, this.input, stream, this.getRdfLanguage(), profile); - this.parserThread = new Thread(parserRunnable); - this.parserThread.setDaemon(true); - this.parserThread.start(); - } - - /** - * Gets the RDF iterator to use - * - * @return Iterator - */ - protected abstract PipedRDFIterator<TValue> getPipedIterator(); - - /** - * Gets the RDF stream to parse to - * - * @param iterator - * Iterator - * @return RDF stream - */ - protected abstract TrackedPipedRDFStream<TValue> getPipedStream(PipedRDFIterator<TValue> iterator, TrackableInputStream input); - - /** - * Gets the RDF language to use for parsing - * - * @return - */ - protected abstract Lang getRdfLanguage(); - - /** - * Creates the runnable upon which the parsing will run - * - * @param input - * Input - * @param stream - * Stream - * @param lang - * Language to use for parsing - * @return Parser runnable - */ - private Runnable createRunnable(@SuppressWarnings("rawtypes") final AbstractBlockBasedNodeTupleReader reader, final InputStream input, - final PipedRDFStream<TValue> stream, final Lang lang, final ParserProfile profile) { - return new Runnable() { - @Override - public void run() { - try { - ReaderRIOT riotReader = RDFDataMgr.createReader(lang); - riotReader.setParserProfile(profile); - riotReader.read(input, null, lang.getContentType(), stream, null); - //RDFDataMgr.parse(stream, input, null, lang); - reader.setParserFinished(null); - } catch (Throwable e) { - reader.setParserFinished(e); - } - } - }; - } - - /** - * Sets the parser thread finished state - * - * @param e - * Error (if any) - */ - private void setParserFinished(Throwable e) { - synchronized (this.parserThread) { - this.parserError = e; - this.parserFinished = true; - } - } - - /** - * Waits for the parser thread to have reported as finished - * - * @throws InterruptedException - */ - private void waitForParserFinished() throws InterruptedException { - do { - synchronized (this.parserThread) { - if (this.parserFinished) - return; - } - Thread.sleep(50); - } while (true); - } - - /** - * Creates an instance of a writable tuple from the given tuple value - * - * @param tuple - * Tuple value - * @return Writable tuple - */ - protected abstract T createInstance(TValue tuple); - - @Override - public boolean nextKeyValue() throws IOException { - // Reuse key for efficiency - if (key == null) { - key = new LongWritable(); - } - - if (this.finished) - return false; - - try { - if (this.iter.hasNext()) { - // Position will be relative to the start for the split we're - // processing - Long l = this.start + this.stream.getPosition(); - if (l != null) { - this.key.set(l); - // For compressed input the actual length from which we - // calculate progress is likely less than the actual - // uncompressed length so we need to increment the - // length as we go along - // We always add 1 more than the current length because we - // don't want to report 100% progress until we really have - // finished - if (this.compressionCodecs != null && l > this.length) - this.length = l + 1; - } - this.tuple = this.createInstance(this.iter.next()); - return true; - } else { - // Need to ensure that the parser thread has finished in order - // to determine whether we finished without error - this.waitForParserFinished(); - if (this.parserError != null) { - LOG.error("Error parsing block, aborting further parsing", this.parserError); - if (!this.ignoreBadTuples) - throw new IOException("Error parsing block at position " + (this.start + this.input.getBytesRead()) + ", aborting further parsing", - this.parserError); - } - - this.key = null; - this.tuple = null; - this.finished = true; - // This is necessary so that when compressed input is used we - // report 100% progress once we've reached the genuine end of - // the stream - if (this.compressionCodecs != null) - this.length--; - return false; - } - } catch (IOException e) { - throw e; - } catch (Throwable e) { - // Failed to read the tuple on this line - LOG.error("Error parsing block, aborting further parsing", e); - if (!this.ignoreBadTuples) { - this.iter.close(); - throw new IOException("Error parsing block at position " + (this.start + this.input.getBytesRead()) + ", aborting further parsing", e); - } - this.key = null; - this.tuple = null; - this.finished = true; - return false; - } - } - - @Override - public LongWritable getCurrentKey() { - return this.key; - } - - @Override - public T getCurrentValue() { - return this.tuple; - } - - @Override - public float getProgress() { - float progress = 0.0f; - if (this.key == null) { - // We've either not started or we've finished - progress = (this.finished ? 1.0f : 0.0f); - } else if (this.key.get() == Long.MIN_VALUE) { - // We don't have a position so we've either in-progress or finished - progress = (this.finished ? 1.0f : 0.5f); - } else { - // We're some way through the file - progress = (this.key.get() - this.start) / (float) this.length; - } - LOG.debug("getProgress() --> {}", progress); - return progress; - } - - @Override - public void close() throws IOException { - this.iter.close(); - this.input.close(); - this.finished = true; - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.readers; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.jena.hadoop.rdf.io.RdfIOConstants; +import org.apache.jena.hadoop.rdf.io.input.util.BlockInputStream; +import org.apache.jena.hadoop.rdf.io.input.util.RdfIOUtils; +import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream; +import org.apache.jena.hadoop.rdf.io.input.util.TrackedInputStream; +import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream; +import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable; +import org.apache.jena.riot.Lang; +import org.apache.jena.riot.RDFDataMgr; +import org.apache.jena.riot.ReaderRIOT; +import org.apache.jena.riot.lang.PipedRDFIterator; +import org.apache.jena.riot.lang.PipedRDFStream; +import org.apache.jena.riot.system.ParserProfile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An abstract implementation for a record reader that reads records from blocks + * of files, this is a hybrid between {@link AbstractLineBasedNodeTupleReader} + * and {@link AbstractWholeFileNodeTupleReader} in that it can only be used by + * formats which can be split by lines but reduces the overhead by parsing the + * split as a whole rather than as individual lines. + * <p> + * The keys produced are the approximate position in the file at which a tuple + * was found and the values will be node tuples. Positions are approximate + * because they are recorded after the point at which the most recent tuple was + * parsed from the input thus they reflect the approximate position in the + * stream immediately after which the triple was found. + * </p> + * + * + * + * @param <TValue> + * Value type + * @param <T> + * Tuple type + */ +public abstract class AbstractBlockBasedNodeTupleReader<TValue, T extends AbstractNodeTupleWritable<TValue>> extends RecordReader<LongWritable, T> { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractBlockBasedNodeTupleReader.class); + private CompressionCodec compressionCodecs; + private TrackableInputStream input; + private LongWritable key; + private long start, length; + private T tuple; + private TrackedPipedRDFStream<TValue> stream; + private PipedRDFIterator<TValue> iter; + private Thread parserThread; + private boolean finished = false; + private boolean ignoreBadTuples = true; + private boolean parserFinished = false; + private Throwable parserError = null; + + @Override + public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { + LOG.debug("initialize({}, {})", genericSplit, context); + + // Assuming file split + if (!(genericSplit instanceof FileSplit)) + throw new IOException("This record reader only supports FileSplit inputs"); + FileSplit split = (FileSplit) genericSplit; + + // Configuration + Configuration config = context.getConfiguration(); + this.ignoreBadTuples = config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true); + if (this.ignoreBadTuples) + LOG.warn( + "Configured to ignore bad tuples, parsing errors will be logged and further parsing aborted but no user visible errors will be thrown. Consider setting {} to false to disable this behaviour", + RdfIOConstants.INPUT_IGNORE_BAD_TUPLES); + + // Figure out what portion of the file to read + start = split.getStart(); + long end = start + split.getLength(); + final Path file = split.getPath(); + long totalLength = file.getFileSystem(context.getConfiguration()).getFileStatus(file).getLen(); + boolean readToEnd = end == totalLength; + CompressionCodecFactory factory = new CompressionCodecFactory(config); + this.compressionCodecs = factory.getCodec(file); + + LOG.info(String.format("Got split with start %d and length %d for file with total length of %d", new Object[] { start, split.getLength(), totalLength })); + + // Open the file and prepare the input stream + FileSystem fs = file.getFileSystem(config); + FSDataInputStream fileIn = fs.open(file); + this.length = split.getLength(); + if (start > 0) + fileIn.seek(start); + + if (this.compressionCodecs != null) { + // Compressed input + // For compressed input NLineInputFormat will have failed to find + // any line breaks and will give us a split from 0 -> (length - 1) + // Add 1 and re-verify readToEnd so we can abort correctly if ever + // given a partial split of a compressed file + end++; + readToEnd = end == totalLength; + if (start > 0 || !readToEnd) + throw new IOException("This record reader can only be used with compressed input where the split is a whole file"); + input = new TrackedInputStream(this.compressionCodecs.createInputStream(fileIn)); + } else { + // Uncompressed input + + if (readToEnd) { + input = new TrackedInputStream(fileIn); + } else { + // Need to limit the portion of the file we are reading + input = new BlockInputStream(fileIn, split.getLength()); + } + } + + // Set up background thread for parser + iter = this.getPipedIterator(); + this.stream = this.getPipedStream(iter, this.input); + ParserProfile profile = RdfIOUtils.createParserProfile(context, file); + Runnable parserRunnable = this.createRunnable(this, this.input, stream, this.getRdfLanguage(), profile); + this.parserThread = new Thread(parserRunnable); + this.parserThread.setDaemon(true); + this.parserThread.start(); + } + + /** + * Gets the RDF iterator to use + * + * @return Iterator + */ + protected abstract PipedRDFIterator<TValue> getPipedIterator(); + + /** + * Gets the RDF stream to parse to + * + * @param iterator + * Iterator + * @return RDF stream + */ + protected abstract TrackedPipedRDFStream<TValue> getPipedStream(PipedRDFIterator<TValue> iterator, TrackableInputStream input); + + /** + * Gets the RDF language to use for parsing + * + * @return + */ + protected abstract Lang getRdfLanguage(); + + /** + * Creates the runnable upon which the parsing will run + * + * @param input + * Input + * @param stream + * Stream + * @param lang + * Language to use for parsing + * @return Parser runnable + */ + private Runnable createRunnable(@SuppressWarnings("rawtypes") final AbstractBlockBasedNodeTupleReader reader, final InputStream input, + final PipedRDFStream<TValue> stream, final Lang lang, final ParserProfile profile) { + return new Runnable() { + @Override + public void run() { + try { + ReaderRIOT riotReader = RDFDataMgr.createReader(lang); + riotReader.setParserProfile(profile); + riotReader.read(input, null, lang.getContentType(), stream, null); + //RDFDataMgr.parse(stream, input, null, lang); + reader.setParserFinished(null); + } catch (Throwable e) { + reader.setParserFinished(e); + } + } + }; + } + + /** + * Sets the parser thread finished state + * + * @param e + * Error (if any) + */ + private void setParserFinished(Throwable e) { + synchronized (this.parserThread) { + this.parserError = e; + this.parserFinished = true; + } + } + + /** + * Waits for the parser thread to have reported as finished + * + * @throws InterruptedException + */ + private void waitForParserFinished() throws InterruptedException { + do { + synchronized (this.parserThread) { + if (this.parserFinished) + return; + } + Thread.sleep(50); + } while (true); + } + + /** + * Creates an instance of a writable tuple from the given tuple value + * + * @param tuple + * Tuple value + * @return Writable tuple + */ + protected abstract T createInstance(TValue tuple); + + @Override + public boolean nextKeyValue() throws IOException { + // Reuse key for efficiency + if (key == null) { + key = new LongWritable(); + } + + if (this.finished) + return false; + + try { + if (this.iter.hasNext()) { + // Position will be relative to the start for the split we're + // processing + Long l = this.start + this.stream.getPosition(); + if (l != null) { + this.key.set(l); + // For compressed input the actual length from which we + // calculate progress is likely less than the actual + // uncompressed length so we need to increment the + // length as we go along + // We always add 1 more than the current length because we + // don't want to report 100% progress until we really have + // finished + if (this.compressionCodecs != null && l > this.length) + this.length = l + 1; + } + this.tuple = this.createInstance(this.iter.next()); + return true; + } else { + // Need to ensure that the parser thread has finished in order + // to determine whether we finished without error + this.waitForParserFinished(); + if (this.parserError != null) { + LOG.error("Error parsing block, aborting further parsing", this.parserError); + if (!this.ignoreBadTuples) + throw new IOException("Error parsing block at position " + (this.start + this.input.getBytesRead()) + ", aborting further parsing", + this.parserError); + } + + this.key = null; + this.tuple = null; + this.finished = true; + // This is necessary so that when compressed input is used we + // report 100% progress once we've reached the genuine end of + // the stream + if (this.compressionCodecs != null) + this.length--; + return false; + } + } catch (IOException e) { + throw e; + } catch (Throwable e) { + // Failed to read the tuple on this line + LOG.error("Error parsing block, aborting further parsing", e); + if (!this.ignoreBadTuples) { + this.iter.close(); + throw new IOException("Error parsing block at position " + (this.start + this.input.getBytesRead()) + ", aborting further parsing", e); + } + this.key = null; + this.tuple = null; + this.finished = true; + return false; + } + } + + @Override + public LongWritable getCurrentKey() { + return this.key; + } + + @Override + public T getCurrentValue() { + return this.tuple; + } + + @Override + public float getProgress() { + float progress = 0.0f; + if (this.key == null) { + // We've either not started or we've finished + progress = (this.finished ? 1.0f : 0.0f); + } else if (this.key.get() == Long.MIN_VALUE) { + // We don't have a position so we've either in-progress or finished + progress = (this.finished ? 1.0f : 0.5f); + } else { + // We're some way through the file + progress = (this.key.get() - this.start) / (float) this.length; + } + LOG.debug("getProgress() --> {}", progress); + return progress; + } + + @Override + public void close() throws IOException { + this.iter.close(); + this.input.close(); + this.finished = true; + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/3d70d735/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedQuadReader.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedQuadReader.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedQuadReader.java index adc431f..c6b2a6f 100644 --- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedQuadReader.java +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedQuadReader.java @@ -1,29 +1,29 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.jena.hadoop.rdf.io.input.readers; -import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream; -import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedQuadsStream; -import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream; -import org.apache.jena.hadoop.rdf.types.QuadWritable; +import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream; +import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedQuadsStream; +import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream; +import org.apache.jena.hadoop.rdf.types.QuadWritable; import org.apache.jena.riot.lang.PipedRDFIterator; -import org.apache.jena.sparql.core.Quad ; +import org.apache.jena.sparql.core.Quad ; /** * An abstract record reader for whole file triple formats http://git-wip-us.apache.org/repos/asf/jena/blob/3d70d735/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedTripleReader.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedTripleReader.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedTripleReader.java index 43a171c..384b223 100644 --- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedTripleReader.java +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedTripleReader.java @@ -1,28 +1,28 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.jena.hadoop.rdf.io.input.readers; -import org.apache.jena.graph.Triple ; -import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream; -import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream; -import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedTriplesStream; -import org.apache.jena.hadoop.rdf.types.TripleWritable; +import org.apache.jena.graph.Triple ; +import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream; +import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream; +import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedTriplesStream; +import org.apache.jena.hadoop.rdf.types.TripleWritable; import org.apache.jena.riot.lang.PipedRDFIterator; /** http://git-wip-us.apache.org/repos/asf/jena/blob/3d70d735/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedNodeTupleReader.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedNodeTupleReader.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedNodeTupleReader.java index 87d2e06..04adb3b 100644 --- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedNodeTupleReader.java +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedNodeTupleReader.java @@ -1,265 +1,265 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.jena.hadoop.rdf.io.input.readers; - -import java.io.IOException; -import java.util.Iterator; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.CompressionCodecFactory; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; -import org.apache.hadoop.util.LineReader; -import org.apache.jena.hadoop.rdf.io.HadoopIOConstants; -import org.apache.jena.hadoop.rdf.io.RdfIOConstants; -import org.apache.jena.hadoop.rdf.io.input.util.RdfIOUtils; -import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable; -import org.apache.jena.riot.system.ParserProfile; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * An abstract implementation of a record reader that reads records from line - * based tuple formats. This only supports reading from file splits currently. - * <p> - * The keys produced are the position of the line in the file and the values - * will be node tuples - * </p> - * - * - * - * @param <TValue> - * @param <T> - * Writable tuple type - */ -public abstract class AbstractLineBasedNodeTupleReader<TValue, T extends AbstractNodeTupleWritable<TValue>> extends RecordReader<LongWritable, T> { - - private static final Logger LOG = LoggerFactory.getLogger(AbstractLineBasedNodeTupleReader.class); - private CompressionCodecFactory compressionCodecs = null; - private long start, pos, end, estLength; - private int maxLineLength; - private LineReader in; - private LongWritable key = null; - private Text value = null; - private T tuple = null; - private ParserProfile profile = null; - private boolean ignoreBadTuples = true; - - @Override - public final void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { - LOG.debug("initialize({}, {})", genericSplit, context); - - // Assuming file split - if (!(genericSplit instanceof FileSplit)) - throw new IOException("This record reader only supports FileSplit inputs"); - FileSplit split = (FileSplit) genericSplit; - - // Configuration - profile = RdfIOUtils.createParserProfile(context, split.getPath()); - Configuration config = context.getConfiguration(); - this.ignoreBadTuples = config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true); - if (this.ignoreBadTuples) - LOG.warn( - "Configured to ignore bad tuples, parsing errors will be logged and the bad line skipped but no errors will be thrownConsider setting {} to false to disable this behaviour", - RdfIOConstants.INPUT_IGNORE_BAD_TUPLES); - - // Figure out what portion of the file to read - this.maxLineLength = config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE); - start = split.getStart(); - end = start + split.getLength(); - final Path file = split.getPath(); - long totalLength = file.getFileSystem(context.getConfiguration()).getFileStatus(file).getLen(); - compressionCodecs = new CompressionCodecFactory(config); - final CompressionCodec codec = compressionCodecs.getCodec(file); - - LOG.info(String.format("Got split with start %d and length %d for file with total length of %d", new Object[] { start, split.getLength(), totalLength })); - - // Open the file and seek to the start of the split - FileSystem fs = file.getFileSystem(config); - FSDataInputStream fileIn = fs.open(file); - boolean skipFirstLine = false; - if (codec != null) { - // Compressed input - // For compressed input NLineInputFormat will have failed to find - // any line breaks and will give us a split from 0 -> (length - 1) - // Add 1 and verify we got complete split - if (totalLength > split.getLength() + 1) - throw new IOException("This record reader can only be used with compressed input where the split covers the whole file"); - in = new LineReader(codec.createInputStream(fileIn), config); - estLength = end; - end = Long.MAX_VALUE; - } else { - // Uncompressed input - if (start != 0) { - skipFirstLine = true; - --start; - fileIn.seek(start); - } - in = new LineReader(fileIn, config); - } - // Skip first line and re-establish "start". - // This is to do with how line reader reads lines and how - // NLineInputFormat will provide the split information to use - if (skipFirstLine) { - start += in.readLine(new Text(), 0, (int) Math.min(Integer.MAX_VALUE, end - start)); - } - this.pos = start; - } - - /** - * Gets an iterator over the data on the current line - * - * @param line - * Line - * @param profile - * Parser profile - * @return Iterator - */ - protected abstract Iterator<TValue> getIterator(String line, ParserProfile profile); - - /** - * Creates an instance of a writable tuple from the given tuple value - * - * @param tuple - * Tuple value - * @return Writable tuple - */ - protected abstract T createInstance(TValue tuple); - - @Override - public final boolean nextKeyValue() throws IOException { - // Reuse key for efficiency - if (key == null) { - key = new LongWritable(); - } - - // Reset value which we use for reading lines - if (value == null) { - value = new Text(); - } - tuple = null; - - // Try to read the next valid line - int newSize = 0; - while (pos < end) { - // Read next line - newSize = in.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength)); - - // Once we get an empty line we've reached the end of our input - if (newSize == 0) { - break; - } - - // Update position, remember that where inputs are compressed we may - // be at a larger position then we expected because the length of - // the split is likely less than the length of the data once - // decompressed - key.set(pos); - pos += newSize; - if (pos > estLength) - estLength = pos + 1; - - // Skip lines that exceed the line length limit that has been set - if (newSize >= maxLineLength) { - LOG.warn("Skipped oversized line of size {} at position {}", newSize, (pos - newSize)); - continue; - } - - // Attempt to read the tuple from current line - try { - Iterator<TValue> iter = this.getIterator(value.toString(), profile); - if (iter.hasNext()) { - tuple = this.createInstance(iter.next()); - - // If we reach here we've found a valid tuple so we can - // break out of the loop - break; - } else { - // Empty line/Comment line - LOG.debug("Valid line with no triple at position {}", (pos - newSize)); - continue; - } - } catch (Throwable e) { - // Failed to read the tuple on this line - LOG.error("Bad tuple at position " + (pos - newSize), e); - if (this.ignoreBadTuples) - continue; - throw new IOException(String.format("Bad tuple at position %d", (pos - newSize)), e); - } - } - boolean result = this.tuple != null; - - // End of input - if (newSize == 0) { - key = null; - value = null; - tuple = null; - result = false; - estLength = pos; - } - LOG.debug("nextKeyValue() --> {}", result); - return result; - } - - @Override - public LongWritable getCurrentKey() { - LOG.debug("getCurrentKey() --> {}", key); - return key; - } - - @Override - public T getCurrentValue() { - LOG.debug("getCurrentValue() --> {}", tuple); - return tuple; - } - - @Override - public float getProgress() { - float progress = 0.0f; - if (start != end) { - if (end == Long.MAX_VALUE) { - if (estLength == 0) - return 1.0f; - // Use estimated length - progress = Math.min(1.0f, (pos - start) / (float) (estLength - start)); - } else { - // Use actual length - progress = Math.min(1.0f, (pos - start) / (float) (end - start)); - } - } - LOG.debug("getProgress() --> {}", progress); - return progress; - } - - @Override - public void close() throws IOException { - LOG.debug("close()"); - if (in != null) { - in.close(); - } - } - +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.readers; + +import java.io.IOException; +import java.util.Iterator; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.util.LineReader; +import org.apache.jena.hadoop.rdf.io.HadoopIOConstants; +import org.apache.jena.hadoop.rdf.io.RdfIOConstants; +import org.apache.jena.hadoop.rdf.io.input.util.RdfIOUtils; +import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable; +import org.apache.jena.riot.system.ParserProfile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An abstract implementation of a record reader that reads records from line + * based tuple formats. This only supports reading from file splits currently. + * <p> + * The keys produced are the position of the line in the file and the values + * will be node tuples + * </p> + * + * + * + * @param <TValue> + * @param <T> + * Writable tuple type + */ +public abstract class AbstractLineBasedNodeTupleReader<TValue, T extends AbstractNodeTupleWritable<TValue>> extends RecordReader<LongWritable, T> { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractLineBasedNodeTupleReader.class); + private CompressionCodecFactory compressionCodecs = null; + private long start, pos, end, estLength; + private int maxLineLength; + private LineReader in; + private LongWritable key = null; + private Text value = null; + private T tuple = null; + private ParserProfile profile = null; + private boolean ignoreBadTuples = true; + + @Override + public final void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { + LOG.debug("initialize({}, {})", genericSplit, context); + + // Assuming file split + if (!(genericSplit instanceof FileSplit)) + throw new IOException("This record reader only supports FileSplit inputs"); + FileSplit split = (FileSplit) genericSplit; + + // Configuration + profile = RdfIOUtils.createParserProfile(context, split.getPath()); + Configuration config = context.getConfiguration(); + this.ignoreBadTuples = config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true); + if (this.ignoreBadTuples) + LOG.warn( + "Configured to ignore bad tuples, parsing errors will be logged and the bad line skipped but no errors will be thrownConsider setting {} to false to disable this behaviour", + RdfIOConstants.INPUT_IGNORE_BAD_TUPLES); + + // Figure out what portion of the file to read + this.maxLineLength = config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE); + start = split.getStart(); + end = start + split.getLength(); + final Path file = split.getPath(); + long totalLength = file.getFileSystem(context.getConfiguration()).getFileStatus(file).getLen(); + compressionCodecs = new CompressionCodecFactory(config); + final CompressionCodec codec = compressionCodecs.getCodec(file); + + LOG.info(String.format("Got split with start %d and length %d for file with total length of %d", new Object[] { start, split.getLength(), totalLength })); + + // Open the file and seek to the start of the split + FileSystem fs = file.getFileSystem(config); + FSDataInputStream fileIn = fs.open(file); + boolean skipFirstLine = false; + if (codec != null) { + // Compressed input + // For compressed input NLineInputFormat will have failed to find + // any line breaks and will give us a split from 0 -> (length - 1) + // Add 1 and verify we got complete split + if (totalLength > split.getLength() + 1) + throw new IOException("This record reader can only be used with compressed input where the split covers the whole file"); + in = new LineReader(codec.createInputStream(fileIn), config); + estLength = end; + end = Long.MAX_VALUE; + } else { + // Uncompressed input + if (start != 0) { + skipFirstLine = true; + --start; + fileIn.seek(start); + } + in = new LineReader(fileIn, config); + } + // Skip first line and re-establish "start". + // This is to do with how line reader reads lines and how + // NLineInputFormat will provide the split information to use + if (skipFirstLine) { + start += in.readLine(new Text(), 0, (int) Math.min(Integer.MAX_VALUE, end - start)); + } + this.pos = start; + } + + /** + * Gets an iterator over the data on the current line + * + * @param line + * Line + * @param profile + * Parser profile + * @return Iterator + */ + protected abstract Iterator<TValue> getIterator(String line, ParserProfile profile); + + /** + * Creates an instance of a writable tuple from the given tuple value + * + * @param tuple + * Tuple value + * @return Writable tuple + */ + protected abstract T createInstance(TValue tuple); + + @Override + public final boolean nextKeyValue() throws IOException { + // Reuse key for efficiency + if (key == null) { + key = new LongWritable(); + } + + // Reset value which we use for reading lines + if (value == null) { + value = new Text(); + } + tuple = null; + + // Try to read the next valid line + int newSize = 0; + while (pos < end) { + // Read next line + newSize = in.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength)); + + // Once we get an empty line we've reached the end of our input + if (newSize == 0) { + break; + } + + // Update position, remember that where inputs are compressed we may + // be at a larger position then we expected because the length of + // the split is likely less than the length of the data once + // decompressed + key.set(pos); + pos += newSize; + if (pos > estLength) + estLength = pos + 1; + + // Skip lines that exceed the line length limit that has been set + if (newSize >= maxLineLength) { + LOG.warn("Skipped oversized line of size {} at position {}", newSize, (pos - newSize)); + continue; + } + + // Attempt to read the tuple from current line + try { + Iterator<TValue> iter = this.getIterator(value.toString(), profile); + if (iter.hasNext()) { + tuple = this.createInstance(iter.next()); + + // If we reach here we've found a valid tuple so we can + // break out of the loop + break; + } else { + // Empty line/Comment line + LOG.debug("Valid line with no triple at position {}", (pos - newSize)); + continue; + } + } catch (Throwable e) { + // Failed to read the tuple on this line + LOG.error("Bad tuple at position " + (pos - newSize), e); + if (this.ignoreBadTuples) + continue; + throw new IOException(String.format("Bad tuple at position %d", (pos - newSize)), e); + } + } + boolean result = this.tuple != null; + + // End of input + if (newSize == 0) { + key = null; + value = null; + tuple = null; + result = false; + estLength = pos; + } + LOG.debug("nextKeyValue() --> {}", result); + return result; + } + + @Override + public LongWritable getCurrentKey() { + LOG.debug("getCurrentKey() --> {}", key); + return key; + } + + @Override + public T getCurrentValue() { + LOG.debug("getCurrentValue() --> {}", tuple); + return tuple; + } + + @Override + public float getProgress() { + float progress = 0.0f; + if (start != end) { + if (end == Long.MAX_VALUE) { + if (estLength == 0) + return 1.0f; + // Use estimated length + progress = Math.min(1.0f, (pos - start) / (float) (estLength - start)); + } else { + // Use actual length + progress = Math.min(1.0f, (pos - start) / (float) (end - start)); + } + } + LOG.debug("getProgress() --> {}", progress); + return progress; + } + + @Override + public void close() throws IOException { + LOG.debug("close()"); + if (in != null) { + in.close(); + } + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/jena/blob/3d70d735/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedQuadReader.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedQuadReader.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedQuadReader.java index 8aec04c..84c168d 100644 --- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedQuadReader.java +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedQuadReader.java @@ -1,29 +1,29 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.jena.hadoop.rdf.io.input.readers; import java.util.Iterator; - -import org.apache.jena.hadoop.rdf.types.QuadWritable; + +import org.apache.jena.hadoop.rdf.types.QuadWritable; import org.apache.jena.riot.system.ParserProfile; import org.apache.jena.riot.tokens.Tokenizer; -import org.apache.jena.sparql.core.Quad ; +import org.apache.jena.sparql.core.Quad ; /** * An abstract reader for line based quad formats http://git-wip-us.apache.org/repos/asf/jena/blob/3d70d735/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedTripleReader.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedTripleReader.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedTripleReader.java index a67e5f9..780630d 100644 --- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedTripleReader.java +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedTripleReader.java @@ -1,27 +1,27 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.jena.hadoop.rdf.io.input.readers; import java.util.Iterator; - -import org.apache.jena.graph.Triple ; -import org.apache.jena.hadoop.rdf.types.TripleWritable; + +import org.apache.jena.graph.Triple ; +import org.apache.jena.hadoop.rdf.types.TripleWritable; import org.apache.jena.riot.system.ParserProfile; import org.apache.jena.riot.tokens.Tokenizer;
