http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/NQuadsInputFormat.java ---------------------------------------------------------------------- diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/NQuadsInputFormat.java index e1110b6,e1110b6..cda77e5 --- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/NQuadsInputFormat.java +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/NQuadsInputFormat.java @@@ -1,31 -1,31 +1,31 @@@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ package org.apache.jena.hadoop.rdf.io.input.nquads; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; --import org.apache.jena.hadoop.rdf.io.input.AbstractNLineFileInputFormat; --import org.apache.jena.hadoop.rdf.io.input.readers.nquads.NQuadsReader; --import org.apache.jena.hadoop.rdf.types.QuadWritable; -- ++import org.apache.jena.hadoop.rdf.io.input.AbstractNLineFileInputFormat; ++import org.apache.jena.hadoop.rdf.io.input.readers.nquads.NQuadsReader; ++import org.apache.jena.hadoop.rdf.types.QuadWritable; ++ /** * NQuads input format
http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/WholeFileNQuadsInputFormat.java ---------------------------------------------------------------------- diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/WholeFileNQuadsInputFormat.java index 9d5966f,9d5966f..086ddba --- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/WholeFileNQuadsInputFormat.java +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/WholeFileNQuadsInputFormat.java @@@ -1,30 -1,30 +1,30 @@@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ package org.apache.jena.hadoop.rdf.io.input.nquads; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; --import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat; --import org.apache.jena.hadoop.rdf.io.input.readers.nquads.WholeFileNQuadsReader; --import org.apache.jena.hadoop.rdf.types.QuadWritable; ++import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat; ++import org.apache.jena.hadoop.rdf.io.input.readers.nquads.WholeFileNQuadsReader; ++import org.apache.jena.hadoop.rdf.types.QuadWritable; /** http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/BlockedNTriplesInputFormat.java ---------------------------------------------------------------------- diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/BlockedNTriplesInputFormat.java index 3536f5d,3536f5d..81cc6a2 --- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/BlockedNTriplesInputFormat.java +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/BlockedNTriplesInputFormat.java @@@ -1,30 -1,30 +1,30 @@@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ package org.apache.jena.hadoop.rdf.io.input.ntriples; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; --import org.apache.jena.hadoop.rdf.io.input.AbstractNLineFileInputFormat; --import org.apache.jena.hadoop.rdf.io.input.readers.ntriples.BlockedNTriplesReader; --import org.apache.jena.hadoop.rdf.types.TripleWritable; ++import org.apache.jena.hadoop.rdf.io.input.AbstractNLineFileInputFormat; ++import org.apache.jena.hadoop.rdf.io.input.readers.ntriples.BlockedNTriplesReader; ++import org.apache.jena.hadoop.rdf.types.TripleWritable; /** http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/NTriplesInputFormat.java ---------------------------------------------------------------------- diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/NTriplesInputFormat.java index bbe4c9a,bbe4c9a..5bfa04c --- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/NTriplesInputFormat.java +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/NTriplesInputFormat.java @@@ -1,31 -1,31 +1,31 @@@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ package org.apache.jena.hadoop.rdf.io.input.ntriples; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; --import org.apache.jena.hadoop.rdf.io.input.AbstractNLineFileInputFormat; --import org.apache.jena.hadoop.rdf.io.input.readers.ntriples.NTriplesReader; --import org.apache.jena.hadoop.rdf.types.TripleWritable; -- ++import org.apache.jena.hadoop.rdf.io.input.AbstractNLineFileInputFormat; ++import org.apache.jena.hadoop.rdf.io.input.readers.ntriples.NTriplesReader; ++import org.apache.jena.hadoop.rdf.types.TripleWritable; ++ /** * NTriples input format http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/WholeFileNTriplesInputFormat.java ---------------------------------------------------------------------- diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/WholeFileNTriplesInputFormat.java index 5a06789,5a06789..0b7db0a --- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/WholeFileNTriplesInputFormat.java +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/WholeFileNTriplesInputFormat.java @@@ -1,30 -1,30 +1,30 @@@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ package org.apache.jena.hadoop.rdf.io.input.ntriples; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; --import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat; --import org.apache.jena.hadoop.rdf.io.input.readers.ntriples.WholeFileNTriplesReader; --import org.apache.jena.hadoop.rdf.types.TripleWritable; ++import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat; ++import org.apache.jena.hadoop.rdf.io.input.readers.ntriples.WholeFileNTriplesReader; ++import org.apache.jena.hadoop.rdf.types.TripleWritable; /** http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfjson/RdfJsonInputFormat.java ---------------------------------------------------------------------- diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfjson/RdfJsonInputFormat.java index 29d0f6f,29d0f6f..66b1833 --- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfjson/RdfJsonInputFormat.java +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfjson/RdfJsonInputFormat.java @@@ -1,30 -1,30 +1,30 @@@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ package org.apache.jena.hadoop.rdf.io.input.rdfjson; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; --import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat; --import org.apache.jena.hadoop.rdf.io.input.readers.rdfjson.RdfJsonReader; --import org.apache.jena.hadoop.rdf.types.TripleWritable; ++import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat; ++import org.apache.jena.hadoop.rdf.io.input.readers.rdfjson.RdfJsonReader; ++import org.apache.jena.hadoop.rdf.types.TripleWritable; /** http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfxml/RdfXmlInputFormat.java ---------------------------------------------------------------------- diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfxml/RdfXmlInputFormat.java index f231073,f231073..0a2b25b --- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfxml/RdfXmlInputFormat.java +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfxml/RdfXmlInputFormat.java @@@ -1,30 -1,30 +1,30 @@@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ package org.apache.jena.hadoop.rdf.io.input.rdfxml; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; --import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat; --import org.apache.jena.hadoop.rdf.io.input.readers.rdfxml.RdfXmlReader; --import org.apache.jena.hadoop.rdf.types.TripleWritable; ++import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat; ++import org.apache.jena.hadoop.rdf.io.input.readers.rdfxml.RdfXmlReader; ++import org.apache.jena.hadoop.rdf.types.TripleWritable; /** http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedNodeTupleReader.java ---------------------------------------------------------------------- diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedNodeTupleReader.java index cf5e621,cf5e621..33d2baf --- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedNodeTupleReader.java +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedNodeTupleReader.java @@@ -1,344 -1,344 +1,344 @@@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- --package org.apache.jena.hadoop.rdf.io.input.readers; -- --import java.io.IOException; --import java.io.InputStream; -- --import org.apache.hadoop.conf.Configuration; --import org.apache.hadoop.fs.FSDataInputStream; --import org.apache.hadoop.fs.FileSystem; --import org.apache.hadoop.fs.Path; --import org.apache.hadoop.io.LongWritable; --import org.apache.hadoop.io.compress.CompressionCodec; --import org.apache.hadoop.io.compress.CompressionCodecFactory; --import org.apache.hadoop.mapreduce.InputSplit; --import org.apache.hadoop.mapreduce.RecordReader; --import org.apache.hadoop.mapreduce.TaskAttemptContext; --import org.apache.hadoop.mapreduce.lib.input.FileSplit; --import org.apache.jena.hadoop.rdf.io.RdfIOConstants; --import org.apache.jena.hadoop.rdf.io.input.util.BlockInputStream; --import org.apache.jena.hadoop.rdf.io.input.util.RdfIOUtils; --import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream; --import org.apache.jena.hadoop.rdf.io.input.util.TrackedInputStream; --import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream; --import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable; --import org.apache.jena.riot.Lang; --import org.apache.jena.riot.RDFDataMgr; --import org.apache.jena.riot.ReaderRIOT; --import org.apache.jena.riot.lang.PipedRDFIterator; --import org.apache.jena.riot.lang.PipedRDFStream; --import org.apache.jena.riot.system.ParserProfile; --import org.slf4j.Logger; --import org.slf4j.LoggerFactory; -- --/** -- * An abstract implementation for a record reader that reads records from blocks -- * of files, this is a hybrid between {@link AbstractLineBasedNodeTupleReader} -- * and {@link AbstractWholeFileNodeTupleReader} in that it can only be used by -- * formats which can be split by lines but reduces the overhead by parsing the -- * split as a whole rather than as individual lines. -- * <p> -- * The keys produced are the approximate position in the file at which a tuple -- * was found and the values will be node tuples. Positions are approximate -- * because they are recorded after the point at which the most recent tuple was -- * parsed from the input thus they reflect the approximate position in the -- * stream immediately after which the triple was found. -- * </p> -- * -- * -- * -- * @param <TValue> -- * Value type -- * @param <T> -- * Tuple type -- */ --public abstract class AbstractBlockBasedNodeTupleReader<TValue, T extends AbstractNodeTupleWritable<TValue>> extends RecordReader<LongWritable, T> { -- -- private static final Logger LOG = LoggerFactory.getLogger(AbstractBlockBasedNodeTupleReader.class); -- private CompressionCodec compressionCodecs; -- private TrackableInputStream input; -- private LongWritable key; -- private long start, length; -- private T tuple; -- private TrackedPipedRDFStream<TValue> stream; -- private PipedRDFIterator<TValue> iter; -- private Thread parserThread; -- private boolean finished = false; -- private boolean ignoreBadTuples = true; -- private boolean parserFinished = false; -- private Throwable parserError = null; -- -- @Override -- public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { -- LOG.debug("initialize({}, {})", genericSplit, context); -- -- // Assuming file split -- if (!(genericSplit instanceof FileSplit)) -- throw new IOException("This record reader only supports FileSplit inputs"); -- FileSplit split = (FileSplit) genericSplit; -- -- // Configuration -- Configuration config = context.getConfiguration(); -- this.ignoreBadTuples = config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true); -- if (this.ignoreBadTuples) -- LOG.warn( -- "Configured to ignore bad tuples, parsing errors will be logged and further parsing aborted but no user visible errors will be thrown. Consider setting {} to false to disable this behaviour", -- RdfIOConstants.INPUT_IGNORE_BAD_TUPLES); -- -- // Figure out what portion of the file to read -- start = split.getStart(); -- long end = start + split.getLength(); -- final Path file = split.getPath(); -- long totalLength = file.getFileSystem(context.getConfiguration()).getFileStatus(file).getLen(); -- boolean readToEnd = end == totalLength; -- CompressionCodecFactory factory = new CompressionCodecFactory(config); -- this.compressionCodecs = factory.getCodec(file); -- -- LOG.info(String.format("Got split with start %d and length %d for file with total length of %d", new Object[] { start, split.getLength(), totalLength })); -- -- // Open the file and prepare the input stream -- FileSystem fs = file.getFileSystem(config); -- FSDataInputStream fileIn = fs.open(file); -- this.length = split.getLength(); -- if (start > 0) -- fileIn.seek(start); -- -- if (this.compressionCodecs != null) { -- // Compressed input -- // For compressed input NLineInputFormat will have failed to find -- // any line breaks and will give us a split from 0 -> (length - 1) -- // Add 1 and re-verify readToEnd so we can abort correctly if ever -- // given a partial split of a compressed file -- end++; -- readToEnd = end == totalLength; -- if (start > 0 || !readToEnd) -- throw new IOException("This record reader can only be used with compressed input where the split is a whole file"); -- input = new TrackedInputStream(this.compressionCodecs.createInputStream(fileIn)); -- } else { -- // Uncompressed input -- -- if (readToEnd) { -- input = new TrackedInputStream(fileIn); -- } else { -- // Need to limit the portion of the file we are reading -- input = new BlockInputStream(fileIn, split.getLength()); -- } -- } -- -- // Set up background thread for parser -- iter = this.getPipedIterator(); -- this.stream = this.getPipedStream(iter, this.input); -- ParserProfile profile = RdfIOUtils.createParserProfile(context, file); -- Runnable parserRunnable = this.createRunnable(this, this.input, stream, this.getRdfLanguage(), profile); -- this.parserThread = new Thread(parserRunnable); -- this.parserThread.setDaemon(true); -- this.parserThread.start(); -- } -- -- /** -- * Gets the RDF iterator to use -- * -- * @return Iterator -- */ -- protected abstract PipedRDFIterator<TValue> getPipedIterator(); -- -- /** -- * Gets the RDF stream to parse to -- * -- * @param iterator -- * Iterator -- * @return RDF stream -- */ -- protected abstract TrackedPipedRDFStream<TValue> getPipedStream(PipedRDFIterator<TValue> iterator, TrackableInputStream input); -- -- /** -- * Gets the RDF language to use for parsing -- * -- * @return -- */ -- protected abstract Lang getRdfLanguage(); -- -- /** -- * Creates the runnable upon which the parsing will run -- * -- * @param input -- * Input -- * @param stream -- * Stream -- * @param lang -- * Language to use for parsing -- * @return Parser runnable -- */ -- private Runnable createRunnable(@SuppressWarnings("rawtypes") final AbstractBlockBasedNodeTupleReader reader, final InputStream input, -- final PipedRDFStream<TValue> stream, final Lang lang, final ParserProfile profile) { -- return new Runnable() { -- @Override -- public void run() { -- try { -- ReaderRIOT riotReader = RDFDataMgr.createReader(lang); -- riotReader.setParserProfile(profile); -- riotReader.read(input, null, lang.getContentType(), stream, null); -- //RDFDataMgr.parse(stream, input, null, lang); -- reader.setParserFinished(null); -- } catch (Throwable e) { -- reader.setParserFinished(e); -- } -- } -- }; -- } -- -- /** -- * Sets the parser thread finished state -- * -- * @param e -- * Error (if any) -- */ -- private void setParserFinished(Throwable e) { -- synchronized (this.parserThread) { -- this.parserError = e; -- this.parserFinished = true; -- } -- } -- -- /** -- * Waits for the parser thread to have reported as finished -- * -- * @throws InterruptedException -- */ -- private void waitForParserFinished() throws InterruptedException { -- do { -- synchronized (this.parserThread) { -- if (this.parserFinished) -- return; -- } -- Thread.sleep(50); -- } while (true); -- } -- -- /** -- * Creates an instance of a writable tuple from the given tuple value -- * -- * @param tuple -- * Tuple value -- * @return Writable tuple -- */ -- protected abstract T createInstance(TValue tuple); -- -- @Override -- public boolean nextKeyValue() throws IOException { -- // Reuse key for efficiency -- if (key == null) { -- key = new LongWritable(); -- } -- -- if (this.finished) -- return false; -- -- try { -- if (this.iter.hasNext()) { -- // Position will be relative to the start for the split we're -- // processing -- Long l = this.start + this.stream.getPosition(); -- if (l != null) { -- this.key.set(l); -- // For compressed input the actual length from which we -- // calculate progress is likely less than the actual -- // uncompressed length so we need to increment the -- // length as we go along -- // We always add 1 more than the current length because we -- // don't want to report 100% progress until we really have -- // finished -- if (this.compressionCodecs != null && l > this.length) -- this.length = l + 1; -- } -- this.tuple = this.createInstance(this.iter.next()); -- return true; -- } else { -- // Need to ensure that the parser thread has finished in order -- // to determine whether we finished without error -- this.waitForParserFinished(); -- if (this.parserError != null) { -- LOG.error("Error parsing block, aborting further parsing", this.parserError); -- if (!this.ignoreBadTuples) -- throw new IOException("Error parsing block at position " + (this.start + this.input.getBytesRead()) + ", aborting further parsing", -- this.parserError); -- } -- -- this.key = null; -- this.tuple = null; -- this.finished = true; -- // This is necessary so that when compressed input is used we -- // report 100% progress once we've reached the genuine end of -- // the stream -- if (this.compressionCodecs != null) -- this.length--; -- return false; -- } -- } catch (IOException e) { -- throw e; -- } catch (Throwable e) { -- // Failed to read the tuple on this line -- LOG.error("Error parsing block, aborting further parsing", e); -- if (!this.ignoreBadTuples) { -- this.iter.close(); -- throw new IOException("Error parsing block at position " + (this.start + this.input.getBytesRead()) + ", aborting further parsing", e); -- } -- this.key = null; -- this.tuple = null; -- this.finished = true; -- return false; -- } -- } -- -- @Override -- public LongWritable getCurrentKey() { -- return this.key; -- } -- -- @Override -- public T getCurrentValue() { -- return this.tuple; -- } -- -- @Override -- public float getProgress() { -- float progress = 0.0f; -- if (this.key == null) { -- // We've either not started or we've finished -- progress = (this.finished ? 1.0f : 0.0f); -- } else if (this.key.get() == Long.MIN_VALUE) { -- // We don't have a position so we've either in-progress or finished -- progress = (this.finished ? 1.0f : 0.5f); -- } else { -- // We're some way through the file -- progress = (this.key.get() - this.start) / (float) this.length; -- } -- LOG.debug("getProgress() --> {}", progress); -- return progress; -- } -- -- @Override -- public void close() throws IOException { -- this.iter.close(); -- this.input.close(); -- this.finished = true; -- } -- --} ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.jena.hadoop.rdf.io.input.readers; ++ ++import java.io.IOException; ++import java.io.InputStream; ++ ++import org.apache.hadoop.conf.Configuration; ++import org.apache.hadoop.fs.FSDataInputStream; ++import org.apache.hadoop.fs.FileSystem; ++import org.apache.hadoop.fs.Path; ++import org.apache.hadoop.io.LongWritable; ++import org.apache.hadoop.io.compress.CompressionCodec; ++import org.apache.hadoop.io.compress.CompressionCodecFactory; ++import org.apache.hadoop.mapreduce.InputSplit; ++import org.apache.hadoop.mapreduce.RecordReader; ++import org.apache.hadoop.mapreduce.TaskAttemptContext; ++import org.apache.hadoop.mapreduce.lib.input.FileSplit; ++import org.apache.jena.hadoop.rdf.io.RdfIOConstants; ++import org.apache.jena.hadoop.rdf.io.input.util.BlockInputStream; ++import org.apache.jena.hadoop.rdf.io.input.util.RdfIOUtils; ++import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream; ++import org.apache.jena.hadoop.rdf.io.input.util.TrackedInputStream; ++import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream; ++import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable; ++import org.apache.jena.riot.Lang; ++import org.apache.jena.riot.RDFDataMgr; ++import org.apache.jena.riot.ReaderRIOT; ++import org.apache.jena.riot.lang.PipedRDFIterator; ++import org.apache.jena.riot.lang.PipedRDFStream; ++import org.apache.jena.riot.system.ParserProfile; ++import org.slf4j.Logger; ++import org.slf4j.LoggerFactory; ++ ++/** ++ * An abstract implementation for a record reader that reads records from blocks ++ * of files, this is a hybrid between {@link AbstractLineBasedNodeTupleReader} ++ * and {@link AbstractWholeFileNodeTupleReader} in that it can only be used by ++ * formats which can be split by lines but reduces the overhead by parsing the ++ * split as a whole rather than as individual lines. ++ * <p> ++ * The keys produced are the approximate position in the file at which a tuple ++ * was found and the values will be node tuples. Positions are approximate ++ * because they are recorded after the point at which the most recent tuple was ++ * parsed from the input thus they reflect the approximate position in the ++ * stream immediately after which the triple was found. ++ * </p> ++ * ++ * ++ * ++ * @param <TValue> ++ * Value type ++ * @param <T> ++ * Tuple type ++ */ ++public abstract class AbstractBlockBasedNodeTupleReader<TValue, T extends AbstractNodeTupleWritable<TValue>> extends RecordReader<LongWritable, T> { ++ ++ private static final Logger LOG = LoggerFactory.getLogger(AbstractBlockBasedNodeTupleReader.class); ++ private CompressionCodec compressionCodecs; ++ private TrackableInputStream input; ++ private LongWritable key; ++ private long start, length; ++ private T tuple; ++ private TrackedPipedRDFStream<TValue> stream; ++ private PipedRDFIterator<TValue> iter; ++ private Thread parserThread; ++ private boolean finished = false; ++ private boolean ignoreBadTuples = true; ++ private boolean parserFinished = false; ++ private Throwable parserError = null; ++ ++ @Override ++ public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { ++ LOG.debug("initialize({}, {})", genericSplit, context); ++ ++ // Assuming file split ++ if (!(genericSplit instanceof FileSplit)) ++ throw new IOException("This record reader only supports FileSplit inputs"); ++ FileSplit split = (FileSplit) genericSplit; ++ ++ // Configuration ++ Configuration config = context.getConfiguration(); ++ this.ignoreBadTuples = config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true); ++ if (this.ignoreBadTuples) ++ LOG.warn( ++ "Configured to ignore bad tuples, parsing errors will be logged and further parsing aborted but no user visible errors will be thrown. Consider setting {} to false to disable this behaviour", ++ RdfIOConstants.INPUT_IGNORE_BAD_TUPLES); ++ ++ // Figure out what portion of the file to read ++ start = split.getStart(); ++ long end = start + split.getLength(); ++ final Path file = split.getPath(); ++ long totalLength = file.getFileSystem(context.getConfiguration()).getFileStatus(file).getLen(); ++ boolean readToEnd = end == totalLength; ++ CompressionCodecFactory factory = new CompressionCodecFactory(config); ++ this.compressionCodecs = factory.getCodec(file); ++ ++ LOG.info(String.format("Got split with start %d and length %d for file with total length of %d", new Object[] { start, split.getLength(), totalLength })); ++ ++ // Open the file and prepare the input stream ++ FileSystem fs = file.getFileSystem(config); ++ FSDataInputStream fileIn = fs.open(file); ++ this.length = split.getLength(); ++ if (start > 0) ++ fileIn.seek(start); ++ ++ if (this.compressionCodecs != null) { ++ // Compressed input ++ // For compressed input NLineInputFormat will have failed to find ++ // any line breaks and will give us a split from 0 -> (length - 1) ++ // Add 1 and re-verify readToEnd so we can abort correctly if ever ++ // given a partial split of a compressed file ++ end++; ++ readToEnd = end == totalLength; ++ if (start > 0 || !readToEnd) ++ throw new IOException("This record reader can only be used with compressed input where the split is a whole file"); ++ input = new TrackedInputStream(this.compressionCodecs.createInputStream(fileIn)); ++ } else { ++ // Uncompressed input ++ ++ if (readToEnd) { ++ input = new TrackedInputStream(fileIn); ++ } else { ++ // Need to limit the portion of the file we are reading ++ input = new BlockInputStream(fileIn, split.getLength()); ++ } ++ } ++ ++ // Set up background thread for parser ++ iter = this.getPipedIterator(); ++ this.stream = this.getPipedStream(iter, this.input); ++ ParserProfile profile = RdfIOUtils.createParserProfile(context, file); ++ Runnable parserRunnable = this.createRunnable(this, this.input, stream, this.getRdfLanguage(), profile); ++ this.parserThread = new Thread(parserRunnable); ++ this.parserThread.setDaemon(true); ++ this.parserThread.start(); ++ } ++ ++ /** ++ * Gets the RDF iterator to use ++ * ++ * @return Iterator ++ */ ++ protected abstract PipedRDFIterator<TValue> getPipedIterator(); ++ ++ /** ++ * Gets the RDF stream to parse to ++ * ++ * @param iterator ++ * Iterator ++ * @return RDF stream ++ */ ++ protected abstract TrackedPipedRDFStream<TValue> getPipedStream(PipedRDFIterator<TValue> iterator, TrackableInputStream input); ++ ++ /** ++ * Gets the RDF language to use for parsing ++ * ++ * @return ++ */ ++ protected abstract Lang getRdfLanguage(); ++ ++ /** ++ * Creates the runnable upon which the parsing will run ++ * ++ * @param input ++ * Input ++ * @param stream ++ * Stream ++ * @param lang ++ * Language to use for parsing ++ * @return Parser runnable ++ */ ++ private Runnable createRunnable(@SuppressWarnings("rawtypes") final AbstractBlockBasedNodeTupleReader reader, final InputStream input, ++ final PipedRDFStream<TValue> stream, final Lang lang, final ParserProfile profile) { ++ return new Runnable() { ++ @Override ++ public void run() { ++ try { ++ ReaderRIOT riotReader = RDFDataMgr.createReader(lang); ++ riotReader.setParserProfile(profile); ++ riotReader.read(input, null, lang.getContentType(), stream, null); ++ //RDFDataMgr.parse(stream, input, null, lang); ++ reader.setParserFinished(null); ++ } catch (Throwable e) { ++ reader.setParserFinished(e); ++ } ++ } ++ }; ++ } ++ ++ /** ++ * Sets the parser thread finished state ++ * ++ * @param e ++ * Error (if any) ++ */ ++ private void setParserFinished(Throwable e) { ++ synchronized (this.parserThread) { ++ this.parserError = e; ++ this.parserFinished = true; ++ } ++ } ++ ++ /** ++ * Waits for the parser thread to have reported as finished ++ * ++ * @throws InterruptedException ++ */ ++ private void waitForParserFinished() throws InterruptedException { ++ do { ++ synchronized (this.parserThread) { ++ if (this.parserFinished) ++ return; ++ } ++ Thread.sleep(50); ++ } while (true); ++ } ++ ++ /** ++ * Creates an instance of a writable tuple from the given tuple value ++ * ++ * @param tuple ++ * Tuple value ++ * @return Writable tuple ++ */ ++ protected abstract T createInstance(TValue tuple); ++ ++ @Override ++ public boolean nextKeyValue() throws IOException { ++ // Reuse key for efficiency ++ if (key == null) { ++ key = new LongWritable(); ++ } ++ ++ if (this.finished) ++ return false; ++ ++ try { ++ if (this.iter.hasNext()) { ++ // Position will be relative to the start for the split we're ++ // processing ++ Long l = this.start + this.stream.getPosition(); ++ if (l != null) { ++ this.key.set(l); ++ // For compressed input the actual length from which we ++ // calculate progress is likely less than the actual ++ // uncompressed length so we need to increment the ++ // length as we go along ++ // We always add 1 more than the current length because we ++ // don't want to report 100% progress until we really have ++ // finished ++ if (this.compressionCodecs != null && l > this.length) ++ this.length = l + 1; ++ } ++ this.tuple = this.createInstance(this.iter.next()); ++ return true; ++ } else { ++ // Need to ensure that the parser thread has finished in order ++ // to determine whether we finished without error ++ this.waitForParserFinished(); ++ if (this.parserError != null) { ++ LOG.error("Error parsing block, aborting further parsing", this.parserError); ++ if (!this.ignoreBadTuples) ++ throw new IOException("Error parsing block at position " + (this.start + this.input.getBytesRead()) + ", aborting further parsing", ++ this.parserError); ++ } ++ ++ this.key = null; ++ this.tuple = null; ++ this.finished = true; ++ // This is necessary so that when compressed input is used we ++ // report 100% progress once we've reached the genuine end of ++ // the stream ++ if (this.compressionCodecs != null) ++ this.length--; ++ return false; ++ } ++ } catch (IOException e) { ++ throw e; ++ } catch (Throwable e) { ++ // Failed to read the tuple on this line ++ LOG.error("Error parsing block, aborting further parsing", e); ++ if (!this.ignoreBadTuples) { ++ this.iter.close(); ++ throw new IOException("Error parsing block at position " + (this.start + this.input.getBytesRead()) + ", aborting further parsing", e); ++ } ++ this.key = null; ++ this.tuple = null; ++ this.finished = true; ++ return false; ++ } ++ } ++ ++ @Override ++ public LongWritable getCurrentKey() { ++ return this.key; ++ } ++ ++ @Override ++ public T getCurrentValue() { ++ return this.tuple; ++ } ++ ++ @Override ++ public float getProgress() { ++ float progress = 0.0f; ++ if (this.key == null) { ++ // We've either not started or we've finished ++ progress = (this.finished ? 1.0f : 0.0f); ++ } else if (this.key.get() == Long.MIN_VALUE) { ++ // We don't have a position so we've either in-progress or finished ++ progress = (this.finished ? 1.0f : 0.5f); ++ } else { ++ // We're some way through the file ++ progress = (this.key.get() - this.start) / (float) this.length; ++ } ++ LOG.debug("getProgress() --> {}", progress); ++ return progress; ++ } ++ ++ @Override ++ public void close() throws IOException { ++ this.iter.close(); ++ this.input.close(); ++ this.finished = true; ++ } ++ ++} http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedQuadReader.java ---------------------------------------------------------------------- diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedQuadReader.java index c6b2a6f,c6b2a6f..adc431f --- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedQuadReader.java +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedQuadReader.java @@@ -1,29 -1,29 +1,29 @@@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ package org.apache.jena.hadoop.rdf.io.input.readers; --import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream; --import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedQuadsStream; --import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream; --import org.apache.jena.hadoop.rdf.types.QuadWritable; ++import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream; ++import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedQuadsStream; ++import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream; ++import org.apache.jena.hadoop.rdf.types.QuadWritable; import org.apache.jena.riot.lang.PipedRDFIterator; --import org.apache.jena.sparql.core.Quad ; ++import org.apache.jena.sparql.core.Quad ; /** * An abstract record reader for whole file triple formats http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedTripleReader.java ---------------------------------------------------------------------- diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedTripleReader.java index 384b223,384b223..43a171c --- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedTripleReader.java +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedTripleReader.java @@@ -1,28 -1,28 +1,28 @@@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ package org.apache.jena.hadoop.rdf.io.input.readers; --import org.apache.jena.graph.Triple ; --import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream; --import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream; --import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedTriplesStream; --import org.apache.jena.hadoop.rdf.types.TripleWritable; ++import org.apache.jena.graph.Triple ; ++import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream; ++import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream; ++import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedTriplesStream; ++import org.apache.jena.hadoop.rdf.types.TripleWritable; import org.apache.jena.riot.lang.PipedRDFIterator; /** http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedNodeTupleReader.java ---------------------------------------------------------------------- diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedNodeTupleReader.java index 04adb3b,04adb3b..87d2e06 --- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedNodeTupleReader.java +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedNodeTupleReader.java @@@ -1,265 -1,265 +1,265 @@@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- --package org.apache.jena.hadoop.rdf.io.input.readers; -- --import java.io.IOException; --import java.util.Iterator; --import org.apache.hadoop.conf.Configuration; --import org.apache.hadoop.fs.FSDataInputStream; --import org.apache.hadoop.fs.FileSystem; --import org.apache.hadoop.fs.Path; --import org.apache.hadoop.io.LongWritable; --import org.apache.hadoop.io.Text; --import org.apache.hadoop.io.compress.CompressionCodec; --import org.apache.hadoop.io.compress.CompressionCodecFactory; --import org.apache.hadoop.mapreduce.InputSplit; --import org.apache.hadoop.mapreduce.RecordReader; --import org.apache.hadoop.mapreduce.TaskAttemptContext; --import org.apache.hadoop.mapreduce.lib.input.FileSplit; --import org.apache.hadoop.util.LineReader; --import org.apache.jena.hadoop.rdf.io.HadoopIOConstants; --import org.apache.jena.hadoop.rdf.io.RdfIOConstants; --import org.apache.jena.hadoop.rdf.io.input.util.RdfIOUtils; --import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable; --import org.apache.jena.riot.system.ParserProfile; --import org.slf4j.Logger; --import org.slf4j.LoggerFactory; -- --/** -- * An abstract implementation of a record reader that reads records from line -- * based tuple formats. This only supports reading from file splits currently. -- * <p> -- * The keys produced are the position of the line in the file and the values -- * will be node tuples -- * </p> -- * -- * -- * -- * @param <TValue> -- * @param <T> -- * Writable tuple type -- */ --public abstract class AbstractLineBasedNodeTupleReader<TValue, T extends AbstractNodeTupleWritable<TValue>> extends RecordReader<LongWritable, T> { -- -- private static final Logger LOG = LoggerFactory.getLogger(AbstractLineBasedNodeTupleReader.class); -- private CompressionCodecFactory compressionCodecs = null; -- private long start, pos, end, estLength; -- private int maxLineLength; -- private LineReader in; -- private LongWritable key = null; -- private Text value = null; -- private T tuple = null; -- private ParserProfile profile = null; -- private boolean ignoreBadTuples = true; -- -- @Override -- public final void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { -- LOG.debug("initialize({}, {})", genericSplit, context); -- -- // Assuming file split -- if (!(genericSplit instanceof FileSplit)) -- throw new IOException("This record reader only supports FileSplit inputs"); -- FileSplit split = (FileSplit) genericSplit; -- -- // Configuration -- profile = RdfIOUtils.createParserProfile(context, split.getPath()); -- Configuration config = context.getConfiguration(); -- this.ignoreBadTuples = config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true); -- if (this.ignoreBadTuples) -- LOG.warn( -- "Configured to ignore bad tuples, parsing errors will be logged and the bad line skipped but no errors will be thrownConsider setting {} to false to disable this behaviour", -- RdfIOConstants.INPUT_IGNORE_BAD_TUPLES); -- -- // Figure out what portion of the file to read -- this.maxLineLength = config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE); -- start = split.getStart(); -- end = start + split.getLength(); -- final Path file = split.getPath(); -- long totalLength = file.getFileSystem(context.getConfiguration()).getFileStatus(file).getLen(); -- compressionCodecs = new CompressionCodecFactory(config); -- final CompressionCodec codec = compressionCodecs.getCodec(file); -- -- LOG.info(String.format("Got split with start %d and length %d for file with total length of %d", new Object[] { start, split.getLength(), totalLength })); -- -- // Open the file and seek to the start of the split -- FileSystem fs = file.getFileSystem(config); -- FSDataInputStream fileIn = fs.open(file); -- boolean skipFirstLine = false; -- if (codec != null) { -- // Compressed input -- // For compressed input NLineInputFormat will have failed to find -- // any line breaks and will give us a split from 0 -> (length - 1) -- // Add 1 and verify we got complete split -- if (totalLength > split.getLength() + 1) -- throw new IOException("This record reader can only be used with compressed input where the split covers the whole file"); -- in = new LineReader(codec.createInputStream(fileIn), config); -- estLength = end; -- end = Long.MAX_VALUE; -- } else { -- // Uncompressed input -- if (start != 0) { -- skipFirstLine = true; -- --start; -- fileIn.seek(start); -- } -- in = new LineReader(fileIn, config); -- } -- // Skip first line and re-establish "start". -- // This is to do with how line reader reads lines and how -- // NLineInputFormat will provide the split information to use -- if (skipFirstLine) { -- start += in.readLine(new Text(), 0, (int) Math.min(Integer.MAX_VALUE, end - start)); -- } -- this.pos = start; -- } -- -- /** -- * Gets an iterator over the data on the current line -- * -- * @param line -- * Line -- * @param profile -- * Parser profile -- * @return Iterator -- */ -- protected abstract Iterator<TValue> getIterator(String line, ParserProfile profile); -- -- /** -- * Creates an instance of a writable tuple from the given tuple value -- * -- * @param tuple -- * Tuple value -- * @return Writable tuple -- */ -- protected abstract T createInstance(TValue tuple); -- -- @Override -- public final boolean nextKeyValue() throws IOException { -- // Reuse key for efficiency -- if (key == null) { -- key = new LongWritable(); -- } -- -- // Reset value which we use for reading lines -- if (value == null) { -- value = new Text(); -- } -- tuple = null; -- -- // Try to read the next valid line -- int newSize = 0; -- while (pos < end) { -- // Read next line -- newSize = in.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength)); -- -- // Once we get an empty line we've reached the end of our input -- if (newSize == 0) { -- break; -- } -- -- // Update position, remember that where inputs are compressed we may -- // be at a larger position then we expected because the length of -- // the split is likely less than the length of the data once -- // decompressed -- key.set(pos); -- pos += newSize; -- if (pos > estLength) -- estLength = pos + 1; -- -- // Skip lines that exceed the line length limit that has been set -- if (newSize >= maxLineLength) { -- LOG.warn("Skipped oversized line of size {} at position {}", newSize, (pos - newSize)); -- continue; -- } -- -- // Attempt to read the tuple from current line -- try { -- Iterator<TValue> iter = this.getIterator(value.toString(), profile); -- if (iter.hasNext()) { -- tuple = this.createInstance(iter.next()); -- -- // If we reach here we've found a valid tuple so we can -- // break out of the loop -- break; -- } else { -- // Empty line/Comment line -- LOG.debug("Valid line with no triple at position {}", (pos - newSize)); -- continue; -- } -- } catch (Throwable e) { -- // Failed to read the tuple on this line -- LOG.error("Bad tuple at position " + (pos - newSize), e); -- if (this.ignoreBadTuples) -- continue; -- throw new IOException(String.format("Bad tuple at position %d", (pos - newSize)), e); -- } -- } -- boolean result = this.tuple != null; -- -- // End of input -- if (newSize == 0) { -- key = null; -- value = null; -- tuple = null; -- result = false; -- estLength = pos; -- } -- LOG.debug("nextKeyValue() --> {}", result); -- return result; -- } -- -- @Override -- public LongWritable getCurrentKey() { -- LOG.debug("getCurrentKey() --> {}", key); -- return key; -- } -- -- @Override -- public T getCurrentValue() { -- LOG.debug("getCurrentValue() --> {}", tuple); -- return tuple; -- } -- -- @Override -- public float getProgress() { -- float progress = 0.0f; -- if (start != end) { -- if (end == Long.MAX_VALUE) { -- if (estLength == 0) -- return 1.0f; -- // Use estimated length -- progress = Math.min(1.0f, (pos - start) / (float) (estLength - start)); -- } else { -- // Use actual length -- progress = Math.min(1.0f, (pos - start) / (float) (end - start)); -- } -- } -- LOG.debug("getProgress() --> {}", progress); -- return progress; -- } -- -- @Override -- public void close() throws IOException { -- LOG.debug("close()"); -- if (in != null) { -- in.close(); -- } -- } -- ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.jena.hadoop.rdf.io.input.readers; ++ ++import java.io.IOException; ++import java.util.Iterator; ++import org.apache.hadoop.conf.Configuration; ++import org.apache.hadoop.fs.FSDataInputStream; ++import org.apache.hadoop.fs.FileSystem; ++import org.apache.hadoop.fs.Path; ++import org.apache.hadoop.io.LongWritable; ++import org.apache.hadoop.io.Text; ++import org.apache.hadoop.io.compress.CompressionCodec; ++import org.apache.hadoop.io.compress.CompressionCodecFactory; ++import org.apache.hadoop.mapreduce.InputSplit; ++import org.apache.hadoop.mapreduce.RecordReader; ++import org.apache.hadoop.mapreduce.TaskAttemptContext; ++import org.apache.hadoop.mapreduce.lib.input.FileSplit; ++import org.apache.hadoop.util.LineReader; ++import org.apache.jena.hadoop.rdf.io.HadoopIOConstants; ++import org.apache.jena.hadoop.rdf.io.RdfIOConstants; ++import org.apache.jena.hadoop.rdf.io.input.util.RdfIOUtils; ++import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable; ++import org.apache.jena.riot.system.ParserProfile; ++import org.slf4j.Logger; ++import org.slf4j.LoggerFactory; ++ ++/** ++ * An abstract implementation of a record reader that reads records from line ++ * based tuple formats. This only supports reading from file splits currently. ++ * <p> ++ * The keys produced are the position of the line in the file and the values ++ * will be node tuples ++ * </p> ++ * ++ * ++ * ++ * @param <TValue> ++ * @param <T> ++ * Writable tuple type ++ */ ++public abstract class AbstractLineBasedNodeTupleReader<TValue, T extends AbstractNodeTupleWritable<TValue>> extends RecordReader<LongWritable, T> { ++ ++ private static final Logger LOG = LoggerFactory.getLogger(AbstractLineBasedNodeTupleReader.class); ++ private CompressionCodecFactory compressionCodecs = null; ++ private long start, pos, end, estLength; ++ private int maxLineLength; ++ private LineReader in; ++ private LongWritable key = null; ++ private Text value = null; ++ private T tuple = null; ++ private ParserProfile profile = null; ++ private boolean ignoreBadTuples = true; ++ ++ @Override ++ public final void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { ++ LOG.debug("initialize({}, {})", genericSplit, context); ++ ++ // Assuming file split ++ if (!(genericSplit instanceof FileSplit)) ++ throw new IOException("This record reader only supports FileSplit inputs"); ++ FileSplit split = (FileSplit) genericSplit; ++ ++ // Configuration ++ profile = RdfIOUtils.createParserProfile(context, split.getPath()); ++ Configuration config = context.getConfiguration(); ++ this.ignoreBadTuples = config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true); ++ if (this.ignoreBadTuples) ++ LOG.warn( ++ "Configured to ignore bad tuples, parsing errors will be logged and the bad line skipped but no errors will be thrownConsider setting {} to false to disable this behaviour", ++ RdfIOConstants.INPUT_IGNORE_BAD_TUPLES); ++ ++ // Figure out what portion of the file to read ++ this.maxLineLength = config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE); ++ start = split.getStart(); ++ end = start + split.getLength(); ++ final Path file = split.getPath(); ++ long totalLength = file.getFileSystem(context.getConfiguration()).getFileStatus(file).getLen(); ++ compressionCodecs = new CompressionCodecFactory(config); ++ final CompressionCodec codec = compressionCodecs.getCodec(file); ++ ++ LOG.info(String.format("Got split with start %d and length %d for file with total length of %d", new Object[] { start, split.getLength(), totalLength })); ++ ++ // Open the file and seek to the start of the split ++ FileSystem fs = file.getFileSystem(config); ++ FSDataInputStream fileIn = fs.open(file); ++ boolean skipFirstLine = false; ++ if (codec != null) { ++ // Compressed input ++ // For compressed input NLineInputFormat will have failed to find ++ // any line breaks and will give us a split from 0 -> (length - 1) ++ // Add 1 and verify we got complete split ++ if (totalLength > split.getLength() + 1) ++ throw new IOException("This record reader can only be used with compressed input where the split covers the whole file"); ++ in = new LineReader(codec.createInputStream(fileIn), config); ++ estLength = end; ++ end = Long.MAX_VALUE; ++ } else { ++ // Uncompressed input ++ if (start != 0) { ++ skipFirstLine = true; ++ --start; ++ fileIn.seek(start); ++ } ++ in = new LineReader(fileIn, config); ++ } ++ // Skip first line and re-establish "start". ++ // This is to do with how line reader reads lines and how ++ // NLineInputFormat will provide the split information to use ++ if (skipFirstLine) { ++ start += in.readLine(new Text(), 0, (int) Math.min(Integer.MAX_VALUE, end - start)); ++ } ++ this.pos = start; ++ } ++ ++ /** ++ * Gets an iterator over the data on the current line ++ * ++ * @param line ++ * Line ++ * @param profile ++ * Parser profile ++ * @return Iterator ++ */ ++ protected abstract Iterator<TValue> getIterator(String line, ParserProfile profile); ++ ++ /** ++ * Creates an instance of a writable tuple from the given tuple value ++ * ++ * @param tuple ++ * Tuple value ++ * @return Writable tuple ++ */ ++ protected abstract T createInstance(TValue tuple); ++ ++ @Override ++ public final boolean nextKeyValue() throws IOException { ++ // Reuse key for efficiency ++ if (key == null) { ++ key = new LongWritable(); ++ } ++ ++ // Reset value which we use for reading lines ++ if (value == null) { ++ value = new Text(); ++ } ++ tuple = null; ++ ++ // Try to read the next valid line ++ int newSize = 0; ++ while (pos < end) { ++ // Read next line ++ newSize = in.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength)); ++ ++ // Once we get an empty line we've reached the end of our input ++ if (newSize == 0) { ++ break; ++ } ++ ++ // Update position, remember that where inputs are compressed we may ++ // be at a larger position then we expected because the length of ++ // the split is likely less than the length of the data once ++ // decompressed ++ key.set(pos); ++ pos += newSize; ++ if (pos > estLength) ++ estLength = pos + 1; ++ ++ // Skip lines that exceed the line length limit that has been set ++ if (newSize >= maxLineLength) { ++ LOG.warn("Skipped oversized line of size {} at position {}", newSize, (pos - newSize)); ++ continue; ++ } ++ ++ // Attempt to read the tuple from current line ++ try { ++ Iterator<TValue> iter = this.getIterator(value.toString(), profile); ++ if (iter.hasNext()) { ++ tuple = this.createInstance(iter.next()); ++ ++ // If we reach here we've found a valid tuple so we can ++ // break out of the loop ++ break; ++ } else { ++ // Empty line/Comment line ++ LOG.debug("Valid line with no triple at position {}", (pos - newSize)); ++ continue; ++ } ++ } catch (Throwable e) { ++ // Failed to read the tuple on this line ++ LOG.error("Bad tuple at position " + (pos - newSize), e); ++ if (this.ignoreBadTuples) ++ continue; ++ throw new IOException(String.format("Bad tuple at position %d", (pos - newSize)), e); ++ } ++ } ++ boolean result = this.tuple != null; ++ ++ // End of input ++ if (newSize == 0) { ++ key = null; ++ value = null; ++ tuple = null; ++ result = false; ++ estLength = pos; ++ } ++ LOG.debug("nextKeyValue() --> {}", result); ++ return result; ++ } ++ ++ @Override ++ public LongWritable getCurrentKey() { ++ LOG.debug("getCurrentKey() --> {}", key); ++ return key; ++ } ++ ++ @Override ++ public T getCurrentValue() { ++ LOG.debug("getCurrentValue() --> {}", tuple); ++ return tuple; ++ } ++ ++ @Override ++ public float getProgress() { ++ float progress = 0.0f; ++ if (start != end) { ++ if (end == Long.MAX_VALUE) { ++ if (estLength == 0) ++ return 1.0f; ++ // Use estimated length ++ progress = Math.min(1.0f, (pos - start) / (float) (estLength - start)); ++ } else { ++ // Use actual length ++ progress = Math.min(1.0f, (pos - start) / (float) (end - start)); ++ } ++ } ++ LOG.debug("getProgress() --> {}", progress); ++ return progress; ++ } ++ ++ @Override ++ public void close() throws IOException { ++ LOG.debug("close()"); ++ if (in != null) { ++ in.close(); ++ } ++ } ++ } http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedQuadReader.java ---------------------------------------------------------------------- diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedQuadReader.java index 84c168d,84c168d..8aec04c --- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedQuadReader.java +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedQuadReader.java @@@ -1,29 -1,29 +1,29 @@@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ package org.apache.jena.hadoop.rdf.io.input.readers; import java.util.Iterator; -- --import org.apache.jena.hadoop.rdf.types.QuadWritable; ++ ++import org.apache.jena.hadoop.rdf.types.QuadWritable; import org.apache.jena.riot.system.ParserProfile; import org.apache.jena.riot.tokens.Tokenizer; --import org.apache.jena.sparql.core.Quad ; ++import org.apache.jena.sparql.core.Quad ; /** * An abstract reader for line based quad formats http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedTripleReader.java ---------------------------------------------------------------------- diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedTripleReader.java index 780630d,780630d..a67e5f9 --- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedTripleReader.java +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedTripleReader.java @@@ -1,27 -1,27 +1,27 @@@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ package org.apache.jena.hadoop.rdf.io.input.readers; import java.util.Iterator; -- --import org.apache.jena.graph.Triple ; --import org.apache.jena.hadoop.rdf.types.TripleWritable; ++ ++import org.apache.jena.graph.Triple ; ++import org.apache.jena.hadoop.rdf.types.TripleWritable; import org.apache.jena.riot.system.ParserProfile; import org.apache.jena.riot.tokens.Tokenizer;
