http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/NTriplesReader.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/NTriplesReader.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/NTriplesReader.java new file mode 100644 index 0000000..bfc8503 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/NTriplesReader.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.readers.ntriples; + +import java.util.Iterator; + +import org.apache.jena.hadoop.rdf.io.input.readers.AbstractLineBasedTripleReader; +import org.apache.jena.riot.lang.LangNTriples; +import org.apache.jena.riot.system.ParserProfile; +import org.apache.jena.riot.tokens.Tokenizer; +import org.apache.jena.riot.tokens.TokenizerFactory; + +import com.hp.hpl.jena.graph.Triple; + +/** + * A record reader for NTriples + * + * + * + */ +public class NTriplesReader extends AbstractLineBasedTripleReader { + + @Override + protected Iterator<Triple> getTriplesIterator(Tokenizer tokenizer, ParserProfile profile) { + return new LangNTriples(tokenizer, profile, null); + } + + @Override + protected Tokenizer getTokenizer(String line) { + return TokenizerFactory.makeTokenizerString(line); + } +}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/WholeFileNTriplesReader.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/WholeFileNTriplesReader.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/WholeFileNTriplesReader.java new file mode 100644 index 0000000..c200d93 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/WholeFileNTriplesReader.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.readers.ntriples; + +import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileTripleReader; +import org.apache.jena.riot.Lang; + +/** + * A record record for NTriples + * <p> + * Unlike the {@link NTriplesReader} this processes files as a whole rather than + * individual lines. This has the advantage of less parser setup overhead but + * the disadvantage that the input cannot be split between multiple mappers. + * </p> + * + * + * + */ +public class WholeFileNTriplesReader extends AbstractWholeFileTripleReader { + + @Override + protected Lang getRdfLanguage() { + return Lang.NTRIPLES; + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/rdfjson/RdfJsonReader.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/rdfjson/RdfJsonReader.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/rdfjson/RdfJsonReader.java new file mode 100644 index 0000000..009024b --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/rdfjson/RdfJsonReader.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.readers.rdfjson; + +import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileTripleReader; +import org.apache.jena.riot.Lang; + +/** + * A record reader for RDF/JSON files + * + * + * + */ +public class RdfJsonReader extends AbstractWholeFileTripleReader { + + @Override + protected Lang getRdfLanguage() { + return Lang.RDFJSON; + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/rdfxml/RdfXmlReader.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/rdfxml/RdfXmlReader.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/rdfxml/RdfXmlReader.java new file mode 100644 index 0000000..9c374c6 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/rdfxml/RdfXmlReader.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.readers.rdfxml; + +import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileTripleReader; +import org.apache.jena.riot.Lang; + +/** + * A record reader for RDF/XML files + * + * + * + */ +public class RdfXmlReader extends AbstractWholeFileTripleReader { + + @Override + protected Lang getRdfLanguage() { + return Lang.RDFXML; + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftQuadReader.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftQuadReader.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftQuadReader.java new file mode 100644 index 0000000..084b1ec --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftQuadReader.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.readers.thrift; + +import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileQuadReader; +import org.apache.jena.riot.Lang; +import org.apache.jena.riot.RDFLanguages; + +public class ThriftQuadReader extends AbstractWholeFileQuadReader { + + @Override + protected Lang getRdfLanguage() { + return RDFLanguages.THRIFT; + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftTripleReader.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftTripleReader.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftTripleReader.java new file mode 100644 index 0000000..713bfa7 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftTripleReader.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.readers.thrift; + +import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileTripleReader; +import org.apache.jena.riot.Lang; +import org.apache.jena.riot.RDFLanguages; + +public class ThriftTripleReader extends AbstractWholeFileTripleReader { + @Override + protected Lang getRdfLanguage() { + return RDFLanguages.THRIFT; + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/trig/TriGReader.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/trig/TriGReader.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/trig/TriGReader.java new file mode 100644 index 0000000..b1b0c3c --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/trig/TriGReader.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.readers.trig; + +import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileQuadReader; +import org.apache.jena.riot.Lang; + +/** + * A record reader for TriG files + * + * + * + */ +public class TriGReader extends AbstractWholeFileQuadReader { + + @Override + protected Lang getRdfLanguage() { + return Lang.TRIG; + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/trix/TriXReader.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/trix/TriXReader.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/trix/TriXReader.java new file mode 100644 index 0000000..6873c64 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/trix/TriXReader.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.readers.trix; + +import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileQuadReader; +import org.apache.jena.riot.Lang; + +/** + * A record reader for TriX files + * + * + * + */ +public class TriXReader extends AbstractWholeFileQuadReader { + + @Override + protected Lang getRdfLanguage() { + return Lang.TRIX; + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/turtle/TurtleReader.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/turtle/TurtleReader.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/turtle/TurtleReader.java new file mode 100644 index 0000000..b3fb377 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/turtle/TurtleReader.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.readers.turtle; + +import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileTripleReader; +import org.apache.jena.riot.Lang; + +/** + * A record reader for Turtle files + * + * + * + */ +public class TurtleReader extends AbstractWholeFileTripleReader { + + @Override + protected Lang getRdfLanguage() { + return Lang.TURTLE; + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftQuadInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftQuadInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftQuadInputFormat.java new file mode 100644 index 0000000..f75542a --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftQuadInputFormat.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.thrift; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat; +import org.apache.jena.hadoop.rdf.io.input.readers.thrift.ThriftQuadReader; +import org.apache.jena.hadoop.rdf.types.QuadWritable; + +public class ThriftQuadInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> { + + @Override + public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new ThriftQuadReader(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftTripleInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftTripleInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftTripleInputFormat.java new file mode 100644 index 0000000..b60380d --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftTripleInputFormat.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.thrift; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat; +import org.apache.jena.hadoop.rdf.io.input.readers.thrift.ThriftTripleReader; +import org.apache.jena.hadoop.rdf.types.TripleWritable; + +public class ThriftTripleInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> { + + @Override + public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new ThriftTripleReader(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/trig/TriGInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/trig/TriGInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/trig/TriGInputFormat.java new file mode 100644 index 0000000..0b36e93 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/trig/TriGInputFormat.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.trig; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat; +import org.apache.jena.hadoop.rdf.io.input.readers.trig.TriGReader; +import org.apache.jena.hadoop.rdf.types.QuadWritable; + + +/** + * Input format for TriG + * + * + * + */ +public class TriGInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> { + + @Override + public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new TriGReader(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/trix/TriXInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/trix/TriXInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/trix/TriXInputFormat.java new file mode 100644 index 0000000..723c5c3 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/trix/TriXInputFormat.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.trix; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat; +import org.apache.jena.hadoop.rdf.io.input.readers.trix.TriXReader; +import org.apache.jena.hadoop.rdf.types.QuadWritable; + +/** + * Input format for TriX + */ +public class TriXInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> { + + @Override + public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new TriXReader(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/turtle/TurtleInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/turtle/TurtleInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/turtle/TurtleInputFormat.java new file mode 100644 index 0000000..c7771b6 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/turtle/TurtleInputFormat.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.turtle; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat; +import org.apache.jena.hadoop.rdf.io.input.readers.turtle.TurtleReader; +import org.apache.jena.hadoop.rdf.types.TripleWritable; + + +/** + * Turtle input format + * + * + * + */ +public class TurtleInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> { + + @Override + public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new TurtleReader(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/BlockInputStream.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/BlockInputStream.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/BlockInputStream.java new file mode 100644 index 0000000..a9e692e --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/BlockInputStream.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.util; + +import java.io.IOException; +import java.io.InputStream; + +/** + * A block input stream which can is a wrapper around another input stream which + * restricts reading to a specific number of bytes and can report the number of + * bytes read + * <p> + * The class assumes that the underlying input stream has already been seeked to + * the appropriate start point + * </p> + * + * + * + */ +public final class BlockInputStream extends TrackedInputStream { + + private long limit = Long.MAX_VALUE; + + /** + * Creates a new tracked input stream + * + * @param input + * Input stream to track + * @param limit + * Maximum number of bytes to read from the stream + */ + public BlockInputStream(InputStream input, long limit) { + super(input); + if (limit < 0) + throw new IllegalArgumentException("limit must be >= 0"); + this.limit = limit; + } + + @Override + public int read() throws IOException { + if (this.bytesRead >= this.limit) { + return -1; + } + return super.read(); + } + + @Override + public int available() throws IOException { + if (this.bytesRead >= this.limit) { + return 0; + } + return super.available(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (len == 0) { + return 0; + } else if (this.bytesRead >= this.limit) { + return -1; + } else if (len > this.limit - this.bytesRead) { + len = (int) (this.limit - this.bytesRead); + } + return super.read(b, off, len); + } + + @Override + public long skip(long n) throws IOException { + if (n == 0) { + return 0; + } else if (this.bytesRead >= this.limit) { + return -1; + } else if (n > this.limit - this.bytesRead) { + n = this.limit - this.bytesRead; + } + return super.skip(n); + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/RdfIOUtils.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/RdfIOUtils.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/RdfIOUtils.java new file mode 100644 index 0000000..372b22c --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/RdfIOUtils.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.util; + +import java.util.UUID; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.jena.hadoop.rdf.io.RdfIOConstants; +import org.apache.jena.riot.lang.LabelToNode; +import org.apache.jena.riot.system.ErrorHandlerFactory; +import org.apache.jena.riot.system.IRIResolver; +import org.apache.jena.riot.system.ParserProfile; +import org.apache.jena.riot.system.ParserProfileBase; +import org.apache.jena.riot.system.Prologue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * RDF IO utility functions + * + * + * + */ +public class RdfIOUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(RdfIOUtils.class); + + /** + * Private constructor prevents instantiation + */ + private RdfIOUtils() { + } + + /** + * Creates a parser profile for the given job context + * + * @param context + * Context + * @param path + * File path + * @return Parser profile + */ + public static ParserProfile createParserProfile(JobContext context, Path path) { + Prologue prologue = new Prologue(null, IRIResolver.createNoResolve()); + UUID seed = RdfIOUtils.getSeed(context, path); + LabelToNode labelMapping = LabelToNode.createScopeByDocumentHash(seed); + return new ParserProfileBase(prologue, ErrorHandlerFactory.errorHandlerStd, labelMapping); + } + + /** + * Selects a seed for use in generating blank node identifiers + * + * @param context + * Job Context + * @param path + * File path + * @return Seed + */ + public static UUID getSeed(JobContext context, Path path) { + // This is to ensure that blank node allocation policy is constant when + // subsequent MapReduce jobs need that + String jobId = context.getJobID().toString(); + if (jobId == null) { + jobId = String.valueOf(System.currentTimeMillis()); + LOGGER.warn( + "Job ID was not set, using current milliseconds of {}. Sequence of MapReduce jobs must carefully handle blank nodes.", + jobId); + } + + if (!context.getConfiguration().getBoolean(RdfIOConstants.GLOBAL_BNODE_IDENTITY, false)) { + // Using normal file scoped blank node allocation + LOGGER.debug("Generating Blank Node Seed from Job Details (ID={}, Input Path={})", jobId, path); + + // Form a reproducible seed for the run + return new UUID(jobId.hashCode(), path.hashCode()); + } else { + // Using globally scoped blank node allocation + LOGGER.warn( + "Using globally scoped blank node allocation policy from Job Details (ID={}) - this is unsafe if your RDF inputs did not originate from a previous job", + jobId); + + return new UUID(jobId.hashCode(), 0); + } + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackableInputStream.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackableInputStream.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackableInputStream.java new file mode 100644 index 0000000..92e2df5 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackableInputStream.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.util; + +import java.io.InputStream; + +/** + * An input stream that tracks the number of bytes read + * + * + * + */ +public abstract class TrackableInputStream extends InputStream { + + /** + * Gets the number of bytes read + * + * @return Number of bytes read + */ + public abstract long getBytesRead(); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackedInputStream.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackedInputStream.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackedInputStream.java new file mode 100644 index 0000000..e51a866 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackedInputStream.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.util; + +import java.io.IOException; +import java.io.InputStream; + +/** + * A tracked input stream which can is a wrapper around another input stream and + * can report the number of bytes read + * + * + * + */ +public class TrackedInputStream extends TrackableInputStream { + + protected InputStream input; + protected long bytesRead = 0, lastMark; + + /** + * Creates a new tracked input stream + * + * @param input + * Input stream to track + */ + public TrackedInputStream(InputStream input) { + if (input == null) + throw new NullPointerException("Input cannot be null"); + this.input = input; + } + + @Override + public int read() throws IOException { + int read = this.input.read(); + if (read >= 0) + this.bytesRead++; + return read; + } + + @Override + public long getBytesRead() { + return this.bytesRead; + } + + @Override + public void close() throws IOException { + this.input.close(); + } + + @Override + public int available() throws IOException { + return this.input.available(); + } + + @Override + public synchronized void mark(int readlimit) { + this.input.mark(readlimit); + this.lastMark = this.bytesRead; + } + + @Override + public boolean markSupported() { + return this.input.markSupported(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (len == 0) return 0; + int read = this.input.read(b, off, len); + if (read > 0) + this.bytesRead += read; + return read; + } + + @Override + public int read(byte[] b) throws IOException { + return this.read(b, 0, b.length); + } + + @Override + public synchronized void reset() throws IOException { + this.input.reset(); + this.bytesRead = this.lastMark; + } + + @Override + public long skip(long n) throws IOException { + if (n == 0) + return 0; + long skipped = 0; + byte[] buffer = new byte[16]; + int readSize = Math.min(buffer.length, n > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) n); + int read; + do { + if (n - skipped > readSize) { + read = this.input.read(buffer, 0, readSize); + } else { + read = this.input.read(buffer, 0, (int) (n - skipped)); + } + if (read > 0) { + this.bytesRead += read; + skipped += read; + } + } while (skipped < n && read >= 0); + + return skipped; + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackedPipedQuadsStream.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackedPipedQuadsStream.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackedPipedQuadsStream.java new file mode 100644 index 0000000..845c709 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackedPipedQuadsStream.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.util; + +import org.apache.jena.riot.lang.PipedRDFIterator; + +import com.hp.hpl.jena.graph.Triple; +import com.hp.hpl.jena.sparql.core.Quad; + +/** + * A tracked piped quads stream + * + * + * + */ +public class TrackedPipedQuadsStream extends TrackedPipedRDFStream<Quad> { + + /** + * Creates a new stream + * + * @param sink + * Sink + * @param input + * Input stream + */ + public TrackedPipedQuadsStream(PipedRDFIterator<Quad> sink, TrackableInputStream input) { + super(sink, input); + } + + @Override + public void triple(Triple triple) { + // Triples are discarded + } + + @Override + public void quad(Quad quad) { + this.receive(quad); + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackedPipedRDFStream.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackedPipedRDFStream.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackedPipedRDFStream.java new file mode 100644 index 0000000..6e910be --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackedPipedRDFStream.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.util; + +import java.util.LinkedList; +import java.util.Queue; + +import org.apache.jena.riot.lang.PipedRDFIterator; +import org.apache.jena.riot.lang.PipedRDFStream; + +/** + * A tracked piped RDF stream + * + * + * + * @param <T> + * Type corresponding to a supported RDF primitive + */ +public abstract class TrackedPipedRDFStream<T> extends PipedRDFStream<T> { + + private TrackableInputStream input; + private Queue<Long> positions = new LinkedList<Long>(); + + protected TrackedPipedRDFStream(PipedRDFIterator<T> sink, TrackableInputStream input) { + super(sink); + this.input = input; + } + + @Override + protected void receive(T t) { + // Track positions the input stream is at as we receive inputs + synchronized (this.positions) { + this.positions.add(this.input.getBytesRead()); + } + super.receive(t); + } + + /** + * Gets the next position + * + * @return Position + */ + public Long getPosition() { + synchronized (this.positions) { + return this.positions.poll(); + } + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackedPipedTriplesStream.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackedPipedTriplesStream.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackedPipedTriplesStream.java new file mode 100644 index 0000000..2040c4f --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackedPipedTriplesStream.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.util; + +import org.apache.jena.riot.lang.PipedRDFIterator; + +import com.hp.hpl.jena.graph.Triple; +import com.hp.hpl.jena.sparql.core.Quad; + +/** + * A tracked piped triples stream + * + * + * + */ +public class TrackedPipedTriplesStream extends TrackedPipedRDFStream<Triple> { + + /** + * Creates a tracked triples stream + * + * @param sink + * Sink + * @param input + * Input stream + */ + public TrackedPipedTriplesStream(PipedRDFIterator<Triple> sink, TrackableInputStream input) { + super(sink, input); + } + + @Override + public void triple(Triple triple) { + receive(triple); + } + + @Override + public void quad(Quad quad) { + // Quads are discarded + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractBatchedNodeTupleOutputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractBatchedNodeTupleOutputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractBatchedNodeTupleOutputFormat.java new file mode 100644 index 0000000..02fbf9c --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractBatchedNodeTupleOutputFormat.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.output; + +import java.io.Writer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.jena.hadoop.rdf.io.RdfIOConstants; +import org.apache.jena.hadoop.rdf.io.output.writers.AbstractBatchedNodeTupleWriter; +import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable; + + +/** + * Abstract output format for formats that use a + * {@link AbstractBatchedNodeTupleWriter} as their writer + * + * + * + * @param <TKey> + * Key type + * @param <TTuple> + * Tuple type + * @param <TValue> + * Writable tuple type i.e. the value type + */ +public abstract class AbstractBatchedNodeTupleOutputFormat<TKey, TTuple, TValue extends AbstractNodeTupleWritable<TTuple>> extends + AbstractNodeTupleOutputFormat<TKey, TTuple, TValue> { + + @Override + protected RecordWriter<TKey, TValue> getRecordWriter(Writer writer, Configuration config, Path outputPath) { + long batchSize = config.getLong(RdfIOConstants.OUTPUT_BATCH_SIZE, RdfIOConstants.DEFAULT_OUTPUT_BATCH_SIZE); + return this.getRecordWriter(writer, batchSize); + } + + protected abstract RecordWriter<TKey, TValue> getRecordWriter(Writer writer, long batchSize); + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractNodeOutputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractNodeOutputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractNodeOutputFormat.java new file mode 100644 index 0000000..cfc98bd --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractNodeOutputFormat.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.output; + +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.jena.hadoop.rdf.types.NodeWritable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Abstract output format which takes pairs with Node keys and arbitrary values + * and writes them as a simple line based text file + * + * + * + * @param <TValue> Value type + */ +public abstract class AbstractNodeOutputFormat<TValue> extends FileOutputFormat<NodeWritable, TValue> { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractNodeOutputFormat.class); + + @Override + public RecordWriter<NodeWritable, TValue> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { + Configuration config = context.getConfiguration(); + boolean isCompressed = getCompressOutput(context); + CompressionCodec codec = null; + String extension = this.getFileExtension(); + if (isCompressed) { + Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(context, GzipCodec.class); + codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, config); + extension += codec.getDefaultExtension(); + } + Path file = getDefaultWorkFile(context, extension); + LOG.info("Writing output to file " + file); + FileSystem fs = file.getFileSystem(config); + if (!isCompressed) { + FSDataOutputStream fileOut = fs.create(file, false); + return this.getRecordWriter(new OutputStreamWriter(fileOut), config); + } else { + FSDataOutputStream fileOut = fs.create(file, false); + return this.getRecordWriter(new OutputStreamWriter(codec.createOutputStream(fileOut)), config); + } + } + + /** + * Gets the file extension to use for output + * + * @return File extension including the '.' + */ + protected String getFileExtension() { + return ".nodes"; + } + + /** + * Gets the record writer to use + * + * @param writer + * Writer to write output to + * @param config + * Configuration + * @return Record writer + */ + protected abstract RecordWriter<NodeWritable, TValue> getRecordWriter(Writer writer, Configuration config); +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractNodeTupleOutputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractNodeTupleOutputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractNodeTupleOutputFormat.java new file mode 100644 index 0000000..c4a34f5 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractNodeTupleOutputFormat.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.output; + +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An abstract implementation of an output format for line based tuple formats + * where the key is ignored and only the tuple values will be output + * + * + * @param <TKey> + * Key type + * @param <TValue> + * Tuple value type + * @param <T> + * Writable node tuple type + * + */ +public abstract class AbstractNodeTupleOutputFormat<TKey, TValue, T extends AbstractNodeTupleWritable<TValue>> extends + FileOutputFormat<TKey, T> { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractNodeTupleOutputFormat.class); + + @Override + public RecordWriter<TKey, T> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { + Configuration config = context.getConfiguration(); + boolean isCompressed = getCompressOutput(context); + CompressionCodec codec = null; + + // Build the output file path + String extension = this.getFileExtension(); + if (isCompressed) { + // Add compression extension if applicable + Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(context, GzipCodec.class); + codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, config); + extension += codec.getDefaultExtension(); + } + Path file = getDefaultWorkFile(context, extension); + LOG.info("Writing output to file " + file); + + // Open the file appropriately and create a record writer for it + FileSystem fs = file.getFileSystem(config); + if (!isCompressed) { + FSDataOutputStream fileOut = fs.create(file, false); + return this.getRecordWriter(new OutputStreamWriter(fileOut), config, file); + } else { + FSDataOutputStream fileOut = fs.create(file, false); + return this.getRecordWriter(new OutputStreamWriter(codec.createOutputStream(fileOut)), config, file); + } + } + + /** + * Gets the file extension to use for output + * + * @return File extension including the '.' + */ + protected abstract String getFileExtension(); + + /** + * Gets the record writer to use + * + * @param writer + * Writer to write output to + * @param config + * Configuration + * @param outputPath + * Output path being written to + * @return Record writer + * @throws IOException + * May be thrown if a record writer cannot be obtained for any + * reason + */ + protected abstract RecordWriter<TKey, T> getRecordWriter(Writer writer, Configuration config, Path outputPath) + throws IOException; + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractStreamRdfNodeTupleOutputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractStreamRdfNodeTupleOutputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractStreamRdfNodeTupleOutputFormat.java new file mode 100644 index 0000000..30999ae --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractStreamRdfNodeTupleOutputFormat.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.output; + +import java.io.Writer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable; +import org.apache.jena.riot.system.StreamRDF; + +/** + * Abstract output format for formats that use the RIOT {@link StreamRDF} API to + * stream the writes + * + * @param <TKey> + * Key type + * @param <TTuple> + * Tuple type + * @param <TValue> + * Writable tuple type i.e. the value type + */ +public abstract class AbstractStreamRdfNodeTupleOutputFormat<TKey, TTuple, TValue extends AbstractNodeTupleWritable<TTuple>> + extends AbstractNodeTupleOutputFormat<TKey, TTuple, TValue> { + + @Override + protected RecordWriter<TKey, TValue> getRecordWriter(Writer writer, Configuration config, Path outputPath) { + return getRecordWriter(getStream(writer, config), writer, config); + } + + /** + * Gets a writer which provides a bridge between the {@link RecordWriter} + * and {@link StreamRDF} APIs + * + * @param stream + * RDF Stream + * @param writer + * Writer + * @param config + * Configuration + * @return Record Writer + */ + protected abstract RecordWriter<TKey, TValue> getRecordWriter(StreamRDF stream, Writer writer, Configuration config); + + /** + * Gets a {@link StreamRDF} to which the tuples to be output should be + * passed + * + * @param writer + * Writer + * @param config + * Configuration + * @return RDF Stream + */ + protected abstract StreamRDF getStream(Writer writer, Configuration config); +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/QuadsOutputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/QuadsOutputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/QuadsOutputFormat.java new file mode 100644 index 0000000..cc9fe2f --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/QuadsOutputFormat.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jena.hadoop.rdf.io.output; + +import java.io.IOException; +import java.io.Writer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.jena.hadoop.rdf.io.registry.HadoopRdfIORegistry; +import org.apache.jena.hadoop.rdf.types.QuadWritable; +import org.apache.jena.riot.Lang; +import org.apache.jena.riot.RDFLanguages; + +import com.hp.hpl.jena.sparql.core.Quad; + +/** + * An output format for RDF quads that dynamically selects the appropriate quad + * writer to use based on the file extension of the output file. + * <p> + * For example this is useful when the output format may be controlled by a user + * supplied filename i.e. the desired RDF output format is not precisely known + * in advance + * </p> + * + * @param <TKey> + * Key type + */ +public abstract class QuadsOutputFormat<TKey> extends AbstractNodeTupleOutputFormat<TKey, Quad, QuadWritable> { + + @Override + protected RecordWriter<TKey, QuadWritable> getRecordWriter(Writer writer, Configuration config, Path outputPath) + throws IOException { + Lang lang = RDFLanguages.filenameToLang(outputPath.getName()); + if (lang == null) + throw new IOException("There is no registered RDF language for the output file " + outputPath.toString()); + + if (!RDFLanguages.isQuads(lang)) + throw new IOException( + lang.getName() + + " is not a RDF quads format, perhaps you wanted TriplesOutputFormat or TriplesOrQuadsOutputFormat instead?"); + + // This will throw an appropriate error if the language does not support + // writing quads + return HadoopRdfIORegistry.<TKey> createQuadWriter(lang, writer, config); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/TriplesOrQuadsOutputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/TriplesOrQuadsOutputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/TriplesOrQuadsOutputFormat.java new file mode 100644 index 0000000..3eaf0d7 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/TriplesOrQuadsOutputFormat.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jena.hadoop.rdf.io.output; + +import java.io.IOException; +import java.io.Writer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.jena.hadoop.rdf.io.output.writers.QuadsToTriplesWriter; +import org.apache.jena.hadoop.rdf.io.registry.HadoopRdfIORegistry; +import org.apache.jena.hadoop.rdf.types.QuadWritable; +import org.apache.jena.riot.Lang; +import org.apache.jena.riot.RDFLanguages; + +import com.hp.hpl.jena.sparql.core.Quad; + +/** + * An output format for RDF triples/quads that dynamically selects the + * appropriate triple/quad writer to use based on the file extension of the + * output file. + * <p> + * For example this is useful when the output format may be controlled by a user + * supplied filename i.e. the desired RDF output format is not precisely known + * in advance. + * </p> + * <h3>Warning</h3> + * <p> + * Where the format is determined to be triples the quads are converted into + * triples are thus will lose any graph information that might be carried. + * </p> + * + * @param <TKey> + * Key type + */ +public abstract class TriplesOrQuadsOutputFormat<TKey> extends AbstractNodeTupleOutputFormat<TKey, Quad, QuadWritable> { + + @Override + protected RecordWriter<TKey, QuadWritable> getRecordWriter(Writer writer, Configuration config, Path outputPath) + throws IOException { + Lang lang = RDFLanguages.filenameToLang(outputPath.getName()); + if (lang == null) + throw new IOException("There is no registered RDF language for the output file " + outputPath.toString()); + + if (!RDFLanguages.isQuads(lang) && !RDFLanguages.isTriples(lang)) + throw new IOException(lang.getName() + " is not a RDF triples/quads format"); + + if (HadoopRdfIORegistry.hasQuadWriter(lang)) { + // Supports quads directly + return HadoopRdfIORegistry.<TKey> createQuadWriter(lang, writer, config); + } else { + // Try to create a triples writer and wrap downwards from quads + // This will throw an error if a triple writer is not available + return new QuadsToTriplesWriter<TKey>(HadoopRdfIORegistry.<TKey> createTripleWriter(lang, writer, config)); + } + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/TriplesOutputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/TriplesOutputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/TriplesOutputFormat.java new file mode 100644 index 0000000..d9d4189 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/TriplesOutputFormat.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jena.hadoop.rdf.io.output; + +import java.io.IOException; +import java.io.Writer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.jena.hadoop.rdf.io.registry.HadoopRdfIORegistry; +import org.apache.jena.hadoop.rdf.types.TripleWritable; +import org.apache.jena.riot.Lang; +import org.apache.jena.riot.RDFLanguages; + +import com.hp.hpl.jena.graph.Triple; + +/** + * An output format for RDF triples that dynamically selects the appropriate triple + * writer to use based on the file extension of the output file. + * <p> + * For example this is useful when the output format may be controlled by a user + * supplied filename i.e. the desired RDF output format is not precisely known + * in advance + * </p> + * + * @param <TKey> + * Key type + */ +public abstract class TriplesOutputFormat<TKey> extends AbstractNodeTupleOutputFormat<TKey, Triple, TripleWritable> { + + @Override + protected RecordWriter<TKey, TripleWritable> getRecordWriter(Writer writer, Configuration config, Path outputPath) throws IOException { + Lang lang = RDFLanguages.filenameToLang(outputPath.getName()); + if (lang == null) + throw new IOException("There is no registered RDF language for the output file " + outputPath.toString()); + + if (!RDFLanguages.isTriples(lang)) throw new IOException( + lang.getName() + + " is not a RDF triples format, perhaps you wanted QuadsOutputFormat or TriplesOrQuadsOutputFormat instead?"); + + // This will throw an appropriate error if the language does not support writing triples + return HadoopRdfIORegistry.<TKey>createTripleWriter(lang, writer, config); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/jsonld/JsonLDQuadOutputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/jsonld/JsonLDQuadOutputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/jsonld/JsonLDQuadOutputFormat.java new file mode 100644 index 0000000..8f4797a --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/jsonld/JsonLDQuadOutputFormat.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.output.jsonld; + +import java.io.Writer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.jena.hadoop.rdf.io.output.AbstractNodeTupleOutputFormat; +import org.apache.jena.hadoop.rdf.io.output.writers.jsonld.JsonLDQuadWriter; +import org.apache.jena.hadoop.rdf.types.QuadWritable; + +import com.hp.hpl.jena.sparql.core.Quad; + +public class JsonLDQuadOutputFormat<TKey> extends AbstractNodeTupleOutputFormat<TKey, Quad, QuadWritable> { + + @Override + protected String getFileExtension() { + return ".jsonld"; + } + + @Override + protected RecordWriter<TKey, QuadWritable> getRecordWriter(Writer writer, Configuration config, Path outputPath) { + return new JsonLDQuadWriter<TKey>(writer); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/jsonld/JsonLDTripleOutputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/jsonld/JsonLDTripleOutputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/jsonld/JsonLDTripleOutputFormat.java new file mode 100644 index 0000000..a8cbeac --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/jsonld/JsonLDTripleOutputFormat.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.output.jsonld; + +import java.io.Writer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.jena.hadoop.rdf.io.output.AbstractNodeTupleOutputFormat; +import org.apache.jena.hadoop.rdf.io.output.writers.jsonld.JsonLDTripleWriter; +import org.apache.jena.hadoop.rdf.types.TripleWritable; + +import com.hp.hpl.jena.graph.Triple; + +public class JsonLDTripleOutputFormat<TKey> extends AbstractNodeTupleOutputFormat<TKey, Triple, TripleWritable> { + + @Override + protected String getFileExtension() { + return ".jsonld"; + } + + @Override + protected RecordWriter<TKey, TripleWritable> getRecordWriter(Writer writer, Configuration config, Path outputPath) { + return new JsonLDTripleWriter<TKey>(writer); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/nquads/NQuadsOutputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/nquads/NQuadsOutputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/nquads/NQuadsOutputFormat.java new file mode 100644 index 0000000..a8ab017 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/nquads/NQuadsOutputFormat.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.output.nquads; + +import java.io.Writer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.jena.hadoop.rdf.io.output.AbstractNodeTupleOutputFormat; +import org.apache.jena.hadoop.rdf.io.output.writers.nquads.NQuadsWriter; +import org.apache.jena.hadoop.rdf.types.QuadWritable; + +import com.hp.hpl.jena.sparql.core.Quad; + +/** + * NQuads output format + * + * + * + * @param <TKey> + * Key type + */ +public class NQuadsOutputFormat<TKey> extends AbstractNodeTupleOutputFormat<TKey, Quad, QuadWritable> { + + @Override + protected RecordWriter<TKey, QuadWritable> getRecordWriter(Writer writer, Configuration config, Path outputPath) { + return new NQuadsWriter<TKey>(writer); + } + + @Override + protected String getFileExtension() { + return ".nq"; + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/ntriples/NTriplesNodeOutputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/ntriples/NTriplesNodeOutputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/ntriples/NTriplesNodeOutputFormat.java new file mode 100644 index 0000000..56935bb --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/ntriples/NTriplesNodeOutputFormat.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.output.ntriples; + +import java.io.Writer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.jena.hadoop.rdf.io.output.AbstractNodeOutputFormat; +import org.apache.jena.hadoop.rdf.io.output.writers.ntriples.NTriplesNodeWriter; +import org.apache.jena.hadoop.rdf.types.NodeWritable; + + +/** + * NTriples based node output format + * + * + * + * @param <TValue> + * Value type + */ +public class NTriplesNodeOutputFormat<TValue> extends AbstractNodeOutputFormat<TValue> { + + @Override + protected RecordWriter<NodeWritable, TValue> getRecordWriter(Writer writer, Configuration config) { + return new NTriplesNodeWriter<TValue>(writer); + } + +}
