http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/jsonld/JsonLDQuadInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/jsonld/JsonLDQuadInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/jsonld/JsonLDQuadInputFormat.java new file mode 100644 index 0000000..2464946 --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/jsonld/JsonLDQuadInputFormat.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.jsonld; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat; +import org.apache.jena.hadoop.rdf.io.input.readers.jsonld.JsonLDQuadReader; +import org.apache.jena.hadoop.rdf.types.QuadWritable; + +public class JsonLDQuadInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> { + + @Override + public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new JsonLDQuadReader(); + } + +}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/jsonld/JsonLDTripleInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/jsonld/JsonLDTripleInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/jsonld/JsonLDTripleInputFormat.java new file mode 100644 index 0000000..0e08a4b --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/jsonld/JsonLDTripleInputFormat.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.jsonld; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat; +import org.apache.jena.hadoop.rdf.io.input.readers.jsonld.JsonLDTripleReader; +import org.apache.jena.hadoop.rdf.types.TripleWritable; + +public class JsonLDTripleInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> { + + @Override + public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new JsonLDTripleReader(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/BlockedNQuadsInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/BlockedNQuadsInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/BlockedNQuadsInputFormat.java new file mode 100644 index 0000000..6829c4d --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/BlockedNQuadsInputFormat.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.nquads; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.jena.hadoop.rdf.io.input.AbstractNLineFileInputFormat; +import org.apache.jena.hadoop.rdf.io.input.readers.nquads.BlockedNQuadsReader; +import org.apache.jena.hadoop.rdf.types.QuadWritable; + + +/** + * NTriples input format where files are processed as blocks of lines rather + * than in a line based manner as with the {@link NQuadsInputFormat} or as + * whole files with the {@link WholeFileNQuadsInputFormat} + * <p> + * This provides a compromise between the higher parser setup of creating more + * parsers and the benefit of being able to split input files over multiple + * mappers. + * </p> + * + * + * + */ +public class BlockedNQuadsInputFormat extends AbstractNLineFileInputFormat<LongWritable, QuadWritable> { + + @Override + public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new BlockedNQuadsReader(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/NQuadsInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/NQuadsInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/NQuadsInputFormat.java new file mode 100644 index 0000000..802fbea --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/NQuadsInputFormat.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.nquads; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.jena.hadoop.rdf.io.input.AbstractNLineFileInputFormat; +import org.apache.jena.hadoop.rdf.io.input.readers.nquads.NQuadsReader; +import org.apache.jena.hadoop.rdf.types.QuadWritable; + + +/** + * NQuads input format + * + * + * + */ +public class NQuadsInputFormat extends AbstractNLineFileInputFormat<LongWritable, QuadWritable> { + + @Override + public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit arg0, TaskAttemptContext arg1) + throws IOException, InterruptedException { + return new NQuadsReader(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/WholeFileNQuadsInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/WholeFileNQuadsInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/WholeFileNQuadsInputFormat.java new file mode 100644 index 0000000..128d079 --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/WholeFileNQuadsInputFormat.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.nquads; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat; +import org.apache.jena.hadoop.rdf.io.input.readers.nquads.WholeFileNQuadsReader; +import org.apache.jena.hadoop.rdf.types.QuadWritable; + + +/** + * NQuads input format where files are processed as complete files rather than + * in a line based manner as with the {@link NQuadsInputFormat} + * <p> + * This has the advantage of less parser setup overhead but the disadvantage + * that the input cannot be split over multiple mappers. + * </p> + * + * + * + */ +public class WholeFileNQuadsInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> { + + @Override + public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new WholeFileNQuadsReader(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/BlockedNTriplesInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/BlockedNTriplesInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/BlockedNTriplesInputFormat.java new file mode 100644 index 0000000..292167b --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/BlockedNTriplesInputFormat.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.ntriples; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.jena.hadoop.rdf.io.input.AbstractNLineFileInputFormat; +import org.apache.jena.hadoop.rdf.io.input.readers.ntriples.BlockedNTriplesReader; +import org.apache.jena.hadoop.rdf.types.TripleWritable; + + +/** + * NTriples input format where files are processed as blocks of lines rather + * than in a line based manner as with the {@link NTriplesInputFormat} or as + * whole files with the {@link WholeFileNTriplesInputFormat} + * <p> + * This provides a compromise between the higher parser setup of creating more + * parsers and the benefit of being able to split input files over multiple + * mappers. + * </p> + * + * + * + */ +public class BlockedNTriplesInputFormat extends AbstractNLineFileInputFormat<LongWritable, TripleWritable> { + + @Override + public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new BlockedNTriplesReader(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/NTriplesInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/NTriplesInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/NTriplesInputFormat.java new file mode 100644 index 0000000..1694c87 --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/NTriplesInputFormat.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.ntriples; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.jena.hadoop.rdf.io.input.AbstractNLineFileInputFormat; +import org.apache.jena.hadoop.rdf.io.input.readers.ntriples.NTriplesReader; +import org.apache.jena.hadoop.rdf.types.TripleWritable; + + +/** + * NTriples input format + * + * + * + */ +public class NTriplesInputFormat extends AbstractNLineFileInputFormat<LongWritable, TripleWritable> { + + @Override + public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext context) + throws IOException, InterruptedException { + return new NTriplesReader(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/WholeFileNTriplesInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/WholeFileNTriplesInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/WholeFileNTriplesInputFormat.java new file mode 100644 index 0000000..31c1252 --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/WholeFileNTriplesInputFormat.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.ntriples; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat; +import org.apache.jena.hadoop.rdf.io.input.readers.ntriples.WholeFileNTriplesReader; +import org.apache.jena.hadoop.rdf.types.TripleWritable; + + +/** + * NTriples input format where files are processed as complete files rather than + * in a line based manner as with the {@link NTriplesInputFormat} + * <p> + * This has the advantage of less parser setup overhead but the disadvantage + * that the input cannot be split over multiple mappers. + * </p> + * + * + * + */ +public class WholeFileNTriplesInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> { + + @Override + public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new WholeFileNTriplesReader(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfjson/RdfJsonInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfjson/RdfJsonInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfjson/RdfJsonInputFormat.java new file mode 100644 index 0000000..e5a7940 --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfjson/RdfJsonInputFormat.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.rdfjson; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat; +import org.apache.jena.hadoop.rdf.io.input.readers.rdfjson.RdfJsonReader; +import org.apache.jena.hadoop.rdf.types.TripleWritable; + + +/** + * RDF/JSON input format + * + * + * + */ +public class RdfJsonInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> { + + @Override + public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new RdfJsonReader(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfxml/RdfXmlInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfxml/RdfXmlInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfxml/RdfXmlInputFormat.java new file mode 100644 index 0000000..4deb925 --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfxml/RdfXmlInputFormat.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.rdfxml; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat; +import org.apache.jena.hadoop.rdf.io.input.readers.rdfxml.RdfXmlReader; +import org.apache.jena.hadoop.rdf.types.TripleWritable; + + +/** + * RDF/XML input format + * + * + * + */ +public class RdfXmlInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> { + + @Override + public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new RdfXmlReader(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedNodeTupleReader.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedNodeTupleReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedNodeTupleReader.java new file mode 100644 index 0000000..56d031e --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedNodeTupleReader.java @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.readers; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.jena.hadoop.rdf.io.RdfIOConstants; +import org.apache.jena.hadoop.rdf.io.input.util.BlockInputStream; +import org.apache.jena.hadoop.rdf.io.input.util.RdfIOUtils; +import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream; +import org.apache.jena.hadoop.rdf.io.input.util.TrackedInputStream; +import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream; +import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable; +import org.apache.jena.riot.Lang; +import org.apache.jena.riot.RDFDataMgr; +import org.apache.jena.riot.ReaderRIOT; +import org.apache.jena.riot.lang.PipedRDFIterator; +import org.apache.jena.riot.lang.PipedRDFStream; +import org.apache.jena.riot.system.ParserProfile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An abstract implementation for a record reader that reads records from blocks + * of files, this is a hybrid between {@link AbstractLineBasedNodeTupleReader} + * and {@link AbstractWholeFileNodeTupleReader} in that it can only be used by + * formats which can be split by lines but reduces the overhead by parsing the + * split as a whole rather than as individual lines. + * <p> + * The keys produced are the approximate position in the file at which a tuple + * was found and the values will be node tuples. Positions are approximate + * because they are recorded after the point at which the most recent tuple was + * parsed from the input thus they reflect the approximate position in the + * stream immediately after which the triple was found. + * </p> + * + * + * + * @param <TValue> + * Value type + * @param <T> + * Tuple type + */ +public abstract class AbstractBlockBasedNodeTupleReader<TValue, T extends AbstractNodeTupleWritable<TValue>> extends RecordReader<LongWritable, T> { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractBlockBasedNodeTupleReader.class); + private CompressionCodec compressionCodecs; + private TrackableInputStream input; + private LongWritable key; + private long start, length; + private T tuple; + private TrackedPipedRDFStream<TValue> stream; + private PipedRDFIterator<TValue> iter; + private Thread parserThread; + private boolean finished = false; + private boolean ignoreBadTuples = true; + private boolean parserFinished = false; + private Throwable parserError = null; + + @Override + public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException { + LOG.debug("initialize({}, {})", genericSplit, context); + + // Assuming file split + if (!(genericSplit instanceof FileSplit)) + throw new IOException("This record reader only supports FileSplit inputs"); + FileSplit split = (FileSplit) genericSplit; + + // Configuration + Configuration config = context.getConfiguration(); + this.ignoreBadTuples = config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true); + if (this.ignoreBadTuples) + LOG.warn( + "Configured to ignore bad tuples, parsing errors will be logged and further parsing aborted but no user visible errors will be thrown. Consider setting {} to false to disable this behaviour", + RdfIOConstants.INPUT_IGNORE_BAD_TUPLES); + + // Figure out what portion of the file to read + start = split.getStart(); + long end = start + split.getLength(); + final Path file = split.getPath(); + long totalLength = file.getFileSystem(context.getConfiguration()).getFileStatus(file).getLen(); + boolean readToEnd = end == totalLength; + CompressionCodecFactory factory = new CompressionCodecFactory(config); + this.compressionCodecs = factory.getCodec(file); + + LOG.info(String.format("Got split with start %d and length %d for file with total length of %d", new Object[] { start, split.getLength(), totalLength })); + + // Open the file and prepare the input stream + FileSystem fs = file.getFileSystem(config); + FSDataInputStream fileIn = fs.open(file); + this.length = split.getLength(); + if (start > 0) + fileIn.seek(start); + + if (this.compressionCodecs != null) { + // Compressed input + // For compressed input NLineInputFormat will have failed to find + // any line breaks and will give us a split from 0 -> (length - 1) + // Add 1 and re-verify readToEnd so we can abort correctly if ever + // given a partial split of a compressed file + end++; + readToEnd = end == totalLength; + if (start > 0 || !readToEnd) + throw new IOException("This record reader can only be used with compressed input where the split is a whole file"); + input = new TrackedInputStream(this.compressionCodecs.createInputStream(fileIn)); + } else { + // Uncompressed input + + if (readToEnd) { + input = new TrackedInputStream(fileIn); + } else { + // Need to limit the portion of the file we are reading + input = new BlockInputStream(fileIn, split.getLength()); + } + } + + // Set up background thread for parser + iter = this.getPipedIterator(); + this.stream = this.getPipedStream(iter, this.input); + ParserProfile profile = RdfIOUtils.createParserProfile(context, file); + Runnable parserRunnable = this.createRunnable(this, this.input, stream, this.getRdfLanguage(), profile); + this.parserThread = new Thread(parserRunnable); + this.parserThread.setDaemon(true); + this.parserThread.start(); + } + + /** + * Gets the RDF iterator to use + * + * @return Iterator + */ + protected abstract PipedRDFIterator<TValue> getPipedIterator(); + + /** + * Gets the RDF stream to parse to + * + * @param iterator + * Iterator + * @return RDF stream + */ + protected abstract TrackedPipedRDFStream<TValue> getPipedStream(PipedRDFIterator<TValue> iterator, TrackableInputStream input); + + /** + * Gets the RDF language to use for parsing + * + * @return + */ + protected abstract Lang getRdfLanguage(); + + /** + * Creates the runnable upon which the parsing will run + * + * @param input + * Input + * @param stream + * Stream + * @param lang + * Language to use for parsing + * @return Parser runnable + */ + private Runnable createRunnable(@SuppressWarnings("rawtypes") final AbstractBlockBasedNodeTupleReader reader, final InputStream input, + final PipedRDFStream<TValue> stream, final Lang lang, final ParserProfile profile) { + return new Runnable() { + @Override + public void run() { + try { + ReaderRIOT riotReader = RDFDataMgr.createReader(lang); + riotReader.setParserProfile(profile); + riotReader.read(input, null, lang.getContentType(), stream, null); + //RDFDataMgr.parse(stream, input, null, lang); + reader.setParserFinished(null); + } catch (Throwable e) { + reader.setParserFinished(e); + } + } + }; + } + + /** + * Sets the parser thread finished state + * + * @param e + * Error (if any) + */ + private void setParserFinished(Throwable e) { + synchronized (this.parserThread) { + this.parserError = e; + this.parserFinished = true; + } + } + + /** + * Waits for the parser thread to have reported as finished + * + * @throws InterruptedException + */ + private void waitForParserFinished() throws InterruptedException { + do { + synchronized (this.parserThread) { + if (this.parserFinished) + return; + } + Thread.sleep(50); + } while (true); + } + + /** + * Creates an instance of a writable tuple from the given tuple value + * + * @param tuple + * Tuple value + * @return Writable tuple + */ + protected abstract T createInstance(TValue tuple); + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + // Reuse key for efficiency + if (key == null) { + key = new LongWritable(); + } + + if (this.finished) + return false; + + try { + if (this.iter.hasNext()) { + // Position will be relative to the start for the split we're + // processing + Long l = this.start + this.stream.getPosition(); + if (l != null) { + this.key.set(l); + // For compressed input the actual length from which we + // calculate progress is likely less than the actual + // uncompressed length so we need to increment the + // length as we go along + // We always add 1 more than the current length because we + // don't want to report 100% progress until we really have + // finished + if (this.compressionCodecs != null && l > this.length) + this.length = l + 1; + } + this.tuple = this.createInstance(this.iter.next()); + return true; + } else { + // Need to ensure that the parser thread has finished in order + // to determine whether we finished without error + this.waitForParserFinished(); + if (this.parserError != null) { + LOG.error("Error parsing block, aborting further parsing", this.parserError); + if (!this.ignoreBadTuples) + throw new IOException("Error parsing block at position " + (this.start + this.input.getBytesRead()) + ", aborting further parsing", + this.parserError); + } + + this.key = null; + this.tuple = null; + this.finished = true; + // This is necessary so that when compressed input is used we + // report 100% progress once we've reached the genuine end of + // the stream + if (this.compressionCodecs != null) + this.length--; + return false; + } + } catch (IOException e) { + throw e; + } catch (Throwable e) { + // Failed to read the tuple on this line + LOG.error("Error parsing block, aborting further parsing", e); + if (!this.ignoreBadTuples) { + this.iter.close(); + throw new IOException("Error parsing block at position " + (this.start + this.input.getBytesRead()) + ", aborting further parsing", e); + } + this.key = null; + this.tuple = null; + this.finished = true; + return false; + } + } + + @Override + public LongWritable getCurrentKey() throws IOException, InterruptedException { + return this.key; + } + + @Override + public T getCurrentValue() throws IOException, InterruptedException { + return this.tuple; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + float progress = 0.0f; + if (this.key == null) { + // We've either not started or we've finished + progress = (this.finished ? 1.0f : 0.0f); + } else if (this.key.get() == Long.MIN_VALUE) { + // We don't have a position so we've either in-progress or finished + progress = (this.finished ? 1.0f : 0.5f); + } else { + // We're some way through the file + progress = (this.key.get() - this.start) / (float) this.length; + } + LOG.debug("getProgress() --> {}", progress); + return progress; + } + + @Override + public void close() throws IOException { + this.iter.close(); + this.input.close(); + this.finished = true; + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedQuadReader.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedQuadReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedQuadReader.java new file mode 100644 index 0000000..2279444 --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedQuadReader.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.readers; + +import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream; +import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedQuadsStream; +import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream; +import org.apache.jena.hadoop.rdf.types.QuadWritable; +import org.apache.jena.riot.lang.PipedRDFIterator; + +import com.hp.hpl.jena.sparql.core.Quad; + +/** + * An abstract record reader for whole file triple formats + * + * + * + */ +public abstract class AbstractBlockBasedQuadReader extends AbstractBlockBasedNodeTupleReader<Quad, QuadWritable> { + + @Override + protected PipedRDFIterator<Quad> getPipedIterator() { + return new PipedRDFIterator<Quad>(); + } + + @Override + protected TrackedPipedRDFStream<Quad> getPipedStream(PipedRDFIterator<Quad> iterator, TrackableInputStream input) { + return new TrackedPipedQuadsStream(iterator, input); + } + + @Override + protected QuadWritable createInstance(Quad tuple) { + return new QuadWritable(tuple); + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedTripleReader.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedTripleReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedTripleReader.java new file mode 100644 index 0000000..2afd329 --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedTripleReader.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.readers; + +import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream; +import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream; +import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedTriplesStream; +import org.apache.jena.hadoop.rdf.types.TripleWritable; +import org.apache.jena.riot.lang.PipedRDFIterator; + +import com.hp.hpl.jena.graph.Triple; + +/** + * An abstract record reader for whole file triple formats + * + * + * + */ +public abstract class AbstractBlockBasedTripleReader extends AbstractBlockBasedNodeTupleReader<Triple, TripleWritable> { + + @Override + protected PipedRDFIterator<Triple> getPipedIterator() { + return new PipedRDFIterator<Triple>(); + } + + @Override + protected TrackedPipedRDFStream<Triple> getPipedStream(PipedRDFIterator<Triple> iterator, TrackableInputStream input) { + return new TrackedPipedTriplesStream(iterator, input); + } + + @Override + protected TripleWritable createInstance(Triple tuple) { + return new TripleWritable(tuple); + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedNodeTupleReader.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedNodeTupleReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedNodeTupleReader.java new file mode 100644 index 0000000..6c1abe9 --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedNodeTupleReader.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.readers; + +import java.io.IOException; +import java.util.Iterator; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.util.LineReader; +import org.apache.jena.hadoop.rdf.io.HadoopIOConstants; +import org.apache.jena.hadoop.rdf.io.RdfIOConstants; +import org.apache.jena.hadoop.rdf.io.input.util.RdfIOUtils; +import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable; +import org.apache.jena.riot.system.ParserProfile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An abstract implementation of a record reader that reads records from line + * based tuple formats. This only supports reading from file splits currently. + * <p> + * The keys produced are the position of the line in the file and the values + * will be node tuples + * </p> + * + * + * + * @param <TValue> + * @param <T> + * Writable tuple type + */ +public abstract class AbstractLineBasedNodeTupleReader<TValue, T extends AbstractNodeTupleWritable<TValue>> extends RecordReader<LongWritable, T> { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractLineBasedNodeTupleReader.class); + private CompressionCodecFactory compressionCodecs = null; + private long start, pos, end, estLength; + private int maxLineLength; + private LineReader in; + private LongWritable key = null; + private Text value = null; + private T tuple = null; + private ParserProfile profile = null; + private boolean ignoreBadTuples = true; + + @Override + public final void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException { + LOG.debug("initialize({}, {})", genericSplit, context); + + // Assuming file split + if (!(genericSplit instanceof FileSplit)) + throw new IOException("This record reader only supports FileSplit inputs"); + FileSplit split = (FileSplit) genericSplit; + + // Configuration + profile = RdfIOUtils.createParserProfile(context, split.getPath()); + Configuration config = context.getConfiguration(); + this.ignoreBadTuples = config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true); + if (this.ignoreBadTuples) + LOG.warn( + "Configured to ignore bad tuples, parsing errors will be logged and the bad line skipped but no errors will be thrownConsider setting {} to false to disable this behaviour", + RdfIOConstants.INPUT_IGNORE_BAD_TUPLES); + + // Figure out what portion of the file to read + this.maxLineLength = config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE); + start = split.getStart(); + end = start + split.getLength(); + final Path file = split.getPath(); + long totalLength = file.getFileSystem(context.getConfiguration()).getFileStatus(file).getLen(); + compressionCodecs = new CompressionCodecFactory(config); + final CompressionCodec codec = compressionCodecs.getCodec(file); + + LOG.info(String.format("Got split with start %d and length %d for file with total length of %d", new Object[] { start, split.getLength(), totalLength })); + + // Open the file and seek to the start of the split + FileSystem fs = file.getFileSystem(config); + FSDataInputStream fileIn = fs.open(file); + boolean skipFirstLine = false; + if (codec != null) { + // Compressed input + // For compressed input NLineInputFormat will have failed to find + // any line breaks and will give us a split from 0 -> (length - 1) + // Add 1 and verify we got complete split + if (totalLength > split.getLength() + 1) + throw new IOException("This record reader can only be used with compressed input where the split covers the whole file"); + in = new LineReader(codec.createInputStream(fileIn), config); + estLength = end; + end = Long.MAX_VALUE; + } else { + // Uncompressed input + if (start != 0) { + skipFirstLine = true; + --start; + fileIn.seek(start); + } + in = new LineReader(fileIn, config); + } + // Skip first line and re-establish "start". + // This is to do with how line reader reads lines and how + // NLineInputFormat will provide the split information to use + if (skipFirstLine) { + start += in.readLine(new Text(), 0, (int) Math.min((long) Integer.MAX_VALUE, end - start)); + } + this.pos = start; + } + + /** + * Gets an iterator over the data on the current line + * + * @param line + * Line + * @param profile + * Parser profile + * @return Iterator + */ + protected abstract Iterator<TValue> getIterator(String line, ParserProfile profile); + + /** + * Creates an instance of a writable tuple from the given tuple value + * + * @param tuple + * Tuple value + * @return Writable tuple + */ + protected abstract T createInstance(TValue tuple); + + @Override + public final boolean nextKeyValue() throws IOException, InterruptedException { + // Reuse key for efficiency + if (key == null) { + key = new LongWritable(); + } + + // Reset value which we use for reading lines + if (value == null) { + value = new Text(); + } + tuple = null; + + // Try to read the next valid line + int newSize = 0; + while (pos < end) { + // Read next line + newSize = in.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength)); + + // Once we get an empty line we've reached the end of our input + if (newSize == 0) { + break; + } + + // Update position, remember that where inputs are compressed we may + // be at a larger position then we expected because the length of + // the split is likely less than the length of the data once + // decompressed + key.set(pos); + pos += newSize; + if (pos > estLength) + estLength = pos + 1; + + // Skip lines that exceed the line length limit that has been set + if (newSize >= maxLineLength) { + LOG.warn("Skipped oversized line of size {} at position {}", newSize, (pos - newSize)); + continue; + } + + // Attempt to read the tuple from current line + try { + Iterator<TValue> iter = this.getIterator(value.toString(), profile); + if (iter.hasNext()) { + tuple = this.createInstance(iter.next()); + + // If we reach here we've found a valid tuple so we can + // break out of the loop + break; + } else { + // Empty line/Comment line + LOG.debug("Valid line with no triple at position {}", (pos - newSize)); + continue; + } + } catch (Throwable e) { + // Failed to read the tuple on this line + LOG.error("Bad tuple at position " + (pos - newSize), e); + if (this.ignoreBadTuples) + continue; + throw new IOException(String.format("Bad tuple at position %d", (pos - newSize)), e); + } + } + boolean result = this.tuple != null; + + // End of input + if (newSize == 0) { + key = null; + value = null; + tuple = null; + result = false; + estLength = pos; + } + LOG.debug("nextKeyValue() --> {}", result); + return result; + } + + @Override + public LongWritable getCurrentKey() throws IOException, InterruptedException { + LOG.debug("getCurrentKey() --> {}", key); + return key; + } + + @Override + public T getCurrentValue() throws IOException, InterruptedException { + LOG.debug("getCurrentValue() --> {}", tuple); + return tuple; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + float progress = 0.0f; + if (start != end) { + if (end == Long.MAX_VALUE) { + if (estLength == 0) + return 1.0f; + // Use estimated length + progress = Math.min(1.0f, (pos - start) / (float) (estLength - start)); + } else { + // Use actual length + progress = Math.min(1.0f, (pos - start) / (float) (end - start)); + } + } + LOG.debug("getProgress() --> {}", progress); + return progress; + } + + @Override + public void close() throws IOException { + LOG.debug("close()"); + if (in != null) { + in.close(); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedQuadReader.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedQuadReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedQuadReader.java new file mode 100644 index 0000000..ac93865 --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedQuadReader.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.readers; + +import java.util.Iterator; + +import org.apache.jena.hadoop.rdf.types.QuadWritable; +import org.apache.jena.riot.system.ParserProfile; +import org.apache.jena.riot.tokens.Tokenizer; +import com.hp.hpl.jena.sparql.core.Quad; + +/** + * An abstract reader for line based quad formats + * + * + * + */ +public abstract class AbstractLineBasedQuadReader extends AbstractLineBasedNodeTupleReader<Quad, QuadWritable> { + + @Override + protected Iterator<Quad> getIterator(String line, ParserProfile profile) { + Tokenizer tokenizer = getTokenizer(line); + return getQuadsIterator(tokenizer, profile); + } + + @Override + protected QuadWritable createInstance(Quad q) { + return new QuadWritable(q); + } + + protected abstract Tokenizer getTokenizer(String line); + + protected abstract Iterator<Quad> getQuadsIterator(Tokenizer tokenizer, ParserProfile profile); +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedTripleReader.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedTripleReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedTripleReader.java new file mode 100644 index 0000000..a0232f5 --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedTripleReader.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.readers; + +import java.util.Iterator; + +import org.apache.jena.hadoop.rdf.types.TripleWritable; +import org.apache.jena.riot.system.ParserProfile; +import org.apache.jena.riot.tokens.Tokenizer; +import com.hp.hpl.jena.graph.Triple; + +/** + * An abstract record reader for line based triple formats + * + * + * + */ +public abstract class AbstractLineBasedTripleReader extends AbstractLineBasedNodeTupleReader<Triple, TripleWritable> { + + @Override + protected Iterator<Triple> getIterator(String line, ParserProfile profile) { + Tokenizer tokenizer = getTokenizer(line); + return getTriplesIterator(tokenizer, profile); + } + + @Override + protected TripleWritable createInstance(Triple t) { + return new TripleWritable(t); + } + + protected abstract Tokenizer getTokenizer(String line); + + protected abstract Iterator<Triple> getTriplesIterator(Tokenizer tokenizer, ParserProfile profile); + +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractRdfReader.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractRdfReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractRdfReader.java new file mode 100644 index 0000000..d0ffed8 --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractRdfReader.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.readers; + +import java.io.IOException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable; +import org.apache.jena.riot.Lang; +import org.apache.jena.riot.RDFLanguages; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An abstract record reader for arbitrary RDF which provides support for + * selecting the actual record reader to use based on detecting the RDF language + * from the file name + * + * @param <TValue> + * Tuple type + * @param <T> + * Writable tuple type + */ +public abstract class AbstractRdfReader<TValue, T extends AbstractNodeTupleWritable<TValue>> extends + RecordReader<LongWritable, T> { + private static final Logger LOG = LoggerFactory.getLogger(AbstractRdfReader.class); + + private RecordReader<LongWritable, T> reader; + + @Override + public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, + InterruptedException { + LOG.debug("initialize({}, {})", genericSplit, context); + + // Assuming file split + if (!(genericSplit instanceof FileSplit)) + throw new IOException("This record reader only supports FileSplit inputs"); + + // Find RDF language + FileSplit split = (FileSplit) genericSplit; + Path path = split.getPath(); + Lang lang = RDFLanguages.filenameToLang(path.getName()); + if (lang == null) + throw new IOException("There is no registered RDF language for the input file " + path.toString()); + + // Select the record reader and initialize + this.reader = this.selectRecordReader(lang); + this.reader.initialize(split, context); + } + + /** + * Selects the appropriate record reader to use for the given RDF language + * + * @param lang + * RDF language + * @return Record reader + * @throws IOException + * Should be thrown if no record reader can be selected + */ + protected abstract RecordReader<LongWritable, T> selectRecordReader(Lang lang) throws IOException; + + @Override + public final boolean nextKeyValue() throws IOException, InterruptedException { + return this.reader.nextKeyValue(); + } + + @Override + public final LongWritable getCurrentKey() throws IOException, InterruptedException { + return this.reader.getCurrentKey(); + } + + @Override + public final T getCurrentValue() throws IOException, InterruptedException { + return this.reader.getCurrentValue(); + } + + @Override + public final float getProgress() throws IOException, InterruptedException { + return this.reader.getProgress(); + } + + @Override + public final void close() throws IOException { + this.reader.close(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileNodeTupleReader.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileNodeTupleReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileNodeTupleReader.java new file mode 100644 index 0000000..c2da3f7 --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileNodeTupleReader.java @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.readers; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.jena.hadoop.rdf.io.RdfIOConstants; +import org.apache.jena.hadoop.rdf.io.input.util.RdfIOUtils; +import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream; +import org.apache.jena.hadoop.rdf.io.input.util.TrackedInputStream; +import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream; +import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable; +import org.apache.jena.riot.Lang; +import org.apache.jena.riot.RDFDataMgr; +import org.apache.jena.riot.ReaderRIOT; +import org.apache.jena.riot.lang.PipedRDFIterator; +import org.apache.jena.riot.lang.PipedRDFStream; +import org.apache.jena.riot.system.ParserProfile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An abstract implementation for a record reader that reads records from whole + * files i.e. the whole file must be kept together to allow tuples to be + * successfully read. This only supports reading from file splits currently. + * <p> + * The keys produced are the approximate position in the file at which a tuple + * was found and the values will be node tuples. Positions are approximate + * because they are recorded after the point at which the most recent tuple was + * parsed from the input thus they reflect the approximate position in the + * stream immediately after which the triple was found. + * </p> + * <p> + * You should also be aware that with whole file formats syntax compressions in + * the format may mean that there are multiple triples produced with the same + * position and thus key. + * </p> + * + * + * + * @param <TValue> + * Value type + * @param <T> + * Tuple type + */ +public abstract class AbstractWholeFileNodeTupleReader<TValue, T extends AbstractNodeTupleWritable<TValue>> extends RecordReader<LongWritable, T> { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractLineBasedNodeTupleReader.class); + private CompressionCodec compressionCodecs; + private TrackedInputStream input; + private LongWritable key; + private long length; + private T tuple; + private TrackedPipedRDFStream<TValue> stream; + private PipedRDFIterator<TValue> iter; + private Thread parserThread; + private boolean finished = false; + private boolean ignoreBadTuples = true; + private boolean parserFinished = false; + private Throwable parserError = null; + + @Override + public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException { + LOG.debug("initialize({}, {})", genericSplit, context); + + // Assuming file split + if (!(genericSplit instanceof FileSplit)) + throw new IOException("This record reader only supports FileSplit inputs"); + FileSplit split = (FileSplit) genericSplit; + + // Configuration + Configuration config = context.getConfiguration(); + this.ignoreBadTuples = config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true); + if (this.ignoreBadTuples) + LOG.warn( + "Configured to ignore bad tuples, parsing errors will be logged and further parsing aborted but no user visible errors will be thrown. Consider setting {} to false to disable this behaviour", + RdfIOConstants.INPUT_IGNORE_BAD_TUPLES); + + // Figure out what portion of the file to read + if (split.getStart() > 0) + throw new IOException("This record reader requires a file split which covers the entire file"); + final Path file = split.getPath(); + long totalLength = file.getFileSystem(context.getConfiguration()).getFileStatus(file).getLen(); + CompressionCodecFactory factory = new CompressionCodecFactory(config); + this.compressionCodecs = factory.getCodec(file); + + LOG.info(String.format("Got split with start %d and length %d for file with total length of %d", new Object[] { split.getStart(), split.getLength(), + totalLength })); + + if (totalLength > split.getLength()) + throw new IOException("This record reader requires a file split which covers the entire file"); + + // Open the file and prepare the input stream + FileSystem fs = file.getFileSystem(config); + FSDataInputStream fileIn = fs.open(file); + this.length = split.getLength(); + if (this.compressionCodecs != null) { + // Compressed input + input = new TrackedInputStream(this.compressionCodecs.createInputStream(fileIn)); + } else { + // Uncompressed input + input = new TrackedInputStream(fileIn); + } + + // Set up background thread for parser + iter = this.getPipedIterator(); + this.stream = this.getPipedStream(iter, this.input); + ParserProfile profile = RdfIOUtils.createParserProfile(context, file); + Runnable parserRunnable = this.createRunnable(this, this.input, stream, this.getRdfLanguage(), profile); + this.parserThread = new Thread(parserRunnable); + this.parserThread.setDaemon(true); + this.parserThread.start(); + } + + /** + * Gets the RDF iterator to use + * + * @return Iterator + */ + protected abstract PipedRDFIterator<TValue> getPipedIterator(); + + /** + * Gets the RDF stream to parse to + * + * @param iterator + * Iterator + * @return RDF stream + */ + protected abstract TrackedPipedRDFStream<TValue> getPipedStream(PipedRDFIterator<TValue> iterator, TrackableInputStream input); + + /** + * Gets the RDF language to use for parsing + * + * @return + */ + protected abstract Lang getRdfLanguage(); + + /** + * Creates the runnable upon which the parsing will run + * + * @param input + * Input + * @param stream + * Stream + * @param lang + * Language to use for parsing + * @return Parser runnable + */ + private Runnable createRunnable(@SuppressWarnings("rawtypes") final AbstractWholeFileNodeTupleReader reader, final InputStream input, + final PipedRDFStream<TValue> stream, final Lang lang, final ParserProfile profile) { + return new Runnable() { + @Override + public void run() { + try { + ReaderRIOT riotReader = RDFDataMgr.createReader(lang); + riotReader.setParserProfile(profile); + riotReader.read(input, null, lang.getContentType(), stream, null); + reader.setParserFinished(null); + } catch (Throwable e) { + reader.setParserFinished(e); + } + } + }; + } + + /** + * Sets the parser thread finished state + * + * @param e + * Error (if any) + */ + private void setParserFinished(Throwable e) { + synchronized (this.parserThread) { + this.parserError = e; + this.parserFinished = true; + } + } + + /** + * Waits for the parser thread to have reported as finished + * + * @throws InterruptedException + */ + private void waitForParserFinished() throws InterruptedException { + do { + synchronized (this.parserThread) { + if (this.parserFinished) + return; + } + Thread.sleep(50); + } while (true); + } + + /** + * Creates an instance of a writable tuple from the given tuple value + * + * @param tuple + * Tuple value + * @return Writable tuple + */ + protected abstract T createInstance(TValue tuple); + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + // Reuse key for efficiency + if (key == null) { + key = new LongWritable(); + } + + if (this.finished) + return false; + + try { + if (this.iter.hasNext()) { + Long l = this.stream.getPosition(); + if (l != null) { + this.key.set(l); + // For compressed input the actual length from which we + // calculate progress is likely less than the actual + // uncompressed length so we may need to increment the + // length as we go along + // We always add 1 more than the current length because we + // don't want to report 100% progress until we really have + // finished + if (this.compressionCodecs != null && l > this.length) + this.length = l + 1; + } + this.tuple = this.createInstance(this.iter.next()); + return true; + } else { + // Need to ensure that the parser thread has finished in order + // to determine whether we finished without error + this.waitForParserFinished(); + if (this.parserError != null) { + LOG.error("Error parsing whole file, aborting further parsing", this.parserError); + if (!this.ignoreBadTuples) + throw new IOException("Error parsing whole file at position " + this.input.getBytesRead() + ", aborting further parsing", + this.parserError); + + } + + this.key = null; + this.tuple = null; + this.finished = true; + // This is necessary so that when compressed input is used we + // report 100% progress once we've reached the genuine end of + // the stream + if (this.compressionCodecs != null) + this.length--; + return false; + } + } catch (Throwable e) { + // Failed to read the tuple on this line + LOG.error("Error parsing whole file, aborting further parsing", e); + if (!this.ignoreBadTuples) { + this.iter.close(); + throw new IOException("Error parsing whole file at position " + this.input.getBytesRead() + ", aborting further parsing", e); + } + this.key = null; + this.tuple = null; + this.finished = true; + return false; + } + } + + @Override + public LongWritable getCurrentKey() throws IOException, InterruptedException { + return this.key; + } + + @Override + public T getCurrentValue() throws IOException, InterruptedException { + return this.tuple; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + float progress = 0.0f; + if (this.key == null) { + // We've either not started or we've finished + progress = (this.finished ? 1.0f : 0.0f); + } else if (this.key.get() == Long.MIN_VALUE) { + // We don't have a position so we've either in-progress or finished + progress = (this.finished ? 1.0f : 0.5f); + } else { + // We're some way through the file + progress = this.key.get() / (float) this.length; + } + LOG.debug("getProgress() --> {}", progress); + return progress; + } + + @Override + public void close() throws IOException { + this.iter.close(); + this.input.close(); + this.finished = true; + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileQuadReader.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileQuadReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileQuadReader.java new file mode 100644 index 0000000..e525bea --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileQuadReader.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.readers; + +import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream; +import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedQuadsStream; +import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream; +import org.apache.jena.hadoop.rdf.types.QuadWritable; +import org.apache.jena.riot.lang.PipedRDFIterator; + +import com.hp.hpl.jena.sparql.core.Quad; + +/** + * An abstract record reader for whole file triple formats + * + * + * + */ +public abstract class AbstractWholeFileQuadReader extends AbstractWholeFileNodeTupleReader<Quad, QuadWritable> { + + @Override + protected PipedRDFIterator<Quad> getPipedIterator() { + return new PipedRDFIterator<Quad>(); + } + + @Override + protected TrackedPipedRDFStream<Quad> getPipedStream(PipedRDFIterator<Quad> iterator, TrackableInputStream input) { + return new TrackedPipedQuadsStream(iterator, input); + } + + @Override + protected QuadWritable createInstance(Quad tuple) { + return new QuadWritable(tuple); + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileTripleReader.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileTripleReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileTripleReader.java new file mode 100644 index 0000000..8710b99 --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileTripleReader.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.readers; + +import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream; +import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream; +import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedTriplesStream; +import org.apache.jena.hadoop.rdf.types.TripleWritable; +import org.apache.jena.riot.lang.PipedRDFIterator; + +import com.hp.hpl.jena.graph.Triple; + +/** + * An abstract record reader for whole file triple formats + * + * + * + */ +public abstract class AbstractWholeFileTripleReader extends AbstractWholeFileNodeTupleReader<Triple, TripleWritable> { + + @Override + protected PipedRDFIterator<Triple> getPipedIterator() { + return new PipedRDFIterator<Triple>(); + } + + @Override + protected TrackedPipedRDFStream<Triple> getPipedStream(PipedRDFIterator<Triple> iterator, TrackableInputStream input) { + return new TrackedPipedTriplesStream(iterator, input); + } + + @Override + protected TripleWritable createInstance(Triple tuple) { + return new TripleWritable(tuple); + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/QuadsReader.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/QuadsReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/QuadsReader.java new file mode 100644 index 0000000..26b0a8b --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/QuadsReader.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.readers; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.jena.hadoop.rdf.io.registry.HadoopRdfIORegistry; +import org.apache.jena.hadoop.rdf.types.QuadWritable; +import org.apache.jena.riot.Lang; +import org.apache.jena.riot.RDFLanguages; + +import com.hp.hpl.jena.sparql.core.Quad; + +/** + * A record reader that reads triples from any RDF quads format + */ +public class QuadsReader extends AbstractRdfReader<Quad, QuadWritable> { + + @Override + protected RecordReader<LongWritable, QuadWritable> selectRecordReader(Lang lang) throws IOException { + if (!RDFLanguages.isQuads(lang)) + throw new IOException( + lang.getLabel() + + " is not a RDF quads format, perhaps you wanted TriplesInputFormat or TriplesOrQuadsInputFormat instead?"); + + // This will throw an appropriate error if the language does not support + // triples + return HadoopRdfIORegistry.createQuadReader(lang); + } + +}
