GORA-513 Add initial OrientDB datastore impl
Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/71a95b98 Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/71a95b98 Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/71a95b98 Branch: refs/heads/master Commit: 71a95b98f37d268eeafd2fb653ee1b2158416eaf Parents: d5e5560 Author: Kevin Ratnasekera <djkevi...@yahoo.com> Authored: Wed Jul 12 03:08:24 2017 +0530 Committer: Kevin Ratnasekera <djkevi...@yahoo.com> Committed: Wed Jul 12 03:08:24 2017 +0530 ---------------------------------------------------------------------- gora-orientdb/pom.xml | 184 ++++ .../org/apache/gora/orientdb/package-info.java | 23 + .../gora/orientdb/query/OrientDBQuery.java | 113 +++ .../gora/orientdb/query/OrientDBResult.java | 103 +++ .../gora/orientdb/query/package-info.java | 23 + .../gora/orientdb/store/OrientDBMapping.java | 177 ++++ .../orientdb/store/OrientDBMappingBuilder.java | 123 +++ .../gora/orientdb/store/OrientDBStore.java | 922 +++++++++++++++++++ .../orientdb/store/OrientDBStoreParameters.java | 157 ++++ .../gora/orientdb/store/package-info.java | 23 + .../gora/orientdb/GoraOrientDBTestDriver.java | 75 ++ .../mapreduce/OrientDBStoreMapReduceTest.java | 66 ++ .../gora/orientdb/mapreduce/package-info.java | 22 + .../org/apache/gora/orientdb/package-info.java | 23 + .../store/OrientDBGoraDataStoreTest.java | 65 ++ .../gora/orientdb/store/package-info.java | 22 + .../test/resources/gora-orientdb-mapping.xml | 45 + .../src/test/resources/gora.properties | 23 + .../test/resources/orientdb-server-config.xml | 35 + pom.xml | 36 + 20 files changed, 2260 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/71a95b98/gora-orientdb/pom.xml ---------------------------------------------------------------------- diff --git a/gora-orientdb/pom.xml b/gora-orientdb/pom.xml new file mode 100644 index 0000000..50a84dd --- /dev/null +++ b/gora-orientdb/pom.xml @@ -0,0 +1,184 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.gora</groupId> + <artifactId>gora</artifactId> + <version>0.8-SNAPSHOT</version> + <relativePath>../</relativePath> + </parent> + <artifactId>gora-orientdb</artifactId> + <packaging>bundle</packaging> + + <name>Apache Gora :: OrientDB</name> + <url>http://gora.apache.org</url> + <description>The Apache Gora open source framework provides an in-memory data model and + persistence for big data. Gora supports persisting to column stores, key value stores, + document stores and RDBMSs, and analyzing the data with extensive Apache Hadoop MapReduce + support.</description> + <inceptionYear>2010</inceptionYear> + <organization> + <name>The Apache Software Foundation</name> + <url>http://www.apache.org/</url> + </organization> + <issueManagement> + <system>JIRA</system> + <url>https://issues.apache.org/jira/browse/GORA</url> + </issueManagement> + <ciManagement> + <system>Jenkins</system> + <url>https://builds.apache.org/job/Gora-trunk/</url> + </ciManagement> + + <properties> + <osgi.import>*</osgi.import> + <osgi.export>org.apache.gora.orientdb*;version="${project.version}";-noimport:=true</osgi.export> + </properties> + + <build> + <directory>target</directory> + <outputDirectory>target/classes</outputDirectory> + <finalName>${project.artifactId}-${project.version}</finalName> + <testOutputDirectory>target/test-classes</testOutputDirectory> + <testSourceDirectory>src/test/java</testSourceDirectory> + <sourceDirectory>src/main/java</sourceDirectory> + <testResources> + <testResource> + <directory>${project.basedir}/src/test/resources</directory> + <includes> + <include>**/*</include> + </includes> + <!--targetPath>${project.basedir}/target/classes/</targetPath --> + </testResource> + </testResources> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>${build-helper-maven-plugin.version}</version> + <executions> + <execution> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/examples/java</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <dependencies> + <!-- Gora Internal Dependencies --> + <dependency> + <groupId>org.apache.gora</groupId> + <artifactId>gora-core</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.gora</groupId> + <artifactId>gora-core</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </dependency> + + <!-- Logging Dependencies --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </dependency> + + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <exclusions> + <exclusion> + <groupId>javax.jms</groupId> + <artifactId>jms</artifactId> + </exclusion> + </exclusions> + </dependency> + + <!-- Testing Dependencies --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <scope>test</scope> + </dependency> + + <!-- OrientDB Dependencies --> + <dependency> + <groupId>com.orientechnologies</groupId> + <artifactId>orientdb-server</artifactId> + </dependency> + + <dependency> + <groupId>com.orientechnologies</groupId> + <artifactId>orientdb-core</artifactId> + </dependency> + + <dependency> + <groupId>com.orientechnologies</groupId> + <artifactId>orientdb-graphdb</artifactId> + </dependency> + + <dependency> + <groupId>com.orientechnologies</groupId> + <artifactId>orientdb-client</artifactId> + </dependency> + + <dependency> + <groupId>com.github.raymanrt</groupId> + <artifactId>orientqb</artifactId> + </dependency> + + <dependency> + <groupId>org.jdom</groupId> + <artifactId>jdom</artifactId> + <scope>compile</scope> + </dependency> + + + <!-- Hadoop Dependencies --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </dependency> + + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/gora/blob/71a95b98/gora-orientdb/src/main/java/org/apache/gora/orientdb/package-info.java ---------------------------------------------------------------------- diff --git a/gora-orientdb/src/main/java/org/apache/gora/orientdb/package-info.java b/gora-orientdb/src/main/java/org/apache/gora/orientdb/package-info.java new file mode 100644 index 0000000..1eecd09 --- /dev/null +++ b/gora-orientdb/src/main/java/org/apache/gora/orientdb/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package encapsulates all OrientDB dataStore related class implementations. + * + */ +package org.apache.gora.orientdb; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/71a95b98/gora-orientdb/src/main/java/org/apache/gora/orientdb/query/OrientDBQuery.java ---------------------------------------------------------------------- diff --git a/gora-orientdb/src/main/java/org/apache/gora/orientdb/query/OrientDBQuery.java b/gora-orientdb/src/main/java/org/apache/gora/orientdb/query/OrientDBQuery.java new file mode 100644 index 0000000..209549a --- /dev/null +++ b/gora-orientdb/src/main/java/org/apache/gora/orientdb/query/OrientDBQuery.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.orientdb.query; + +import com.github.raymanrt.orientqb.query.Parameter; +import com.github.raymanrt.orientqb.query.Projection; +import com.orientechnologies.orient.core.record.impl.ODocument; +import com.orientechnologies.orient.core.sql.query.OSQLSynchQuery; +import org.apache.gora.orientdb.store.OrientDBMapping; +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.impl.QueryBase; +import org.apache.gora.store.DataStore; +import com.github.raymanrt.orientqb.query.Query; + +import java.util.HashMap; +import java.util.Map; + +import static com.github.raymanrt.orientqb.query.Projection.projection; + + +/** + * OrientDB specific implementation of the {@link org.apache.gora.query.Query} interface. + */ +public class OrientDBQuery<K, T extends PersistentBase> extends QueryBase<K, T> { + + private OSQLSynchQuery<ODocument> dbQuery; + private Map<String, Object> params; + + public OrientDBQuery() { + super(null); + } + + public OrientDBQuery(DataStore<K, T> dataStore) { + super(dataStore); + } + + /** + * Return populated {@link OSQLSynchQuery} Orient DB query. + * + * @return a {@link OSQLSynchQuery} query executable over Orient DB. + */ + public OSQLSynchQuery<ODocument> getOrientDBQuery() { + return dbQuery; + } + + /** + * Dynamic parameters for {@link OSQLSynchQuery} Orient DB query. + * + * @return a param map related to {@link OSQLSynchQuery} Orient DB query. + */ + public Map<String, Object> getParams() { + return params; + } + + /** + * Convert Gora query to Orient DB specific query which underline API understands. + * And maintain it s state encapsulated to Gora implementation of the {@link org.apache.gora.query.Query}. + * + * @return a {@link OSQLSynchQuery} query executable over Orient DB. + */ + public OSQLSynchQuery<ODocument> populateOrientDBQuery(final OrientDBMapping orientDBMapping, + final String[] fields, + final String[] schemaFields) { + params = new HashMap<String, Object>(); + Query selectQuery = new Query(); + selectQuery.from(orientDBMapping.getDocumentClass()); + if ((this.getStartKey() != null) && (this.getEndKey() != null) + && this.getStartKey().equals(this.getEndKey())) { + selectQuery.where(projection("_id").eq(Parameter.parameter("key"))); + params.put("key", this.getStartKey()); + } else if (this.getStartKey() != null || this.getEndKey() != null) { + if (this.getStartKey() != null) { + selectQuery.where(projection("_id").ge(Parameter.parameter("key_lower"))); + params.put("key_lower", this.getStartKey()); + } + if (this.getEndKey() != null) { + selectQuery.where(projection("_id").le(Parameter.parameter("key_upper"))); + params.put("key_upper", this.getEndKey()); + } + } + + if (fields.length == schemaFields.length) { + selectQuery.select(Projection.ALL); + } else { + for (String k : fields) { + String dbFieldName = orientDBMapping.getDocumentField(k); + if (dbFieldName != null && dbFieldName.length() > 0) { + selectQuery.select(dbFieldName); + } + } + selectQuery.select("_id"); + } + dbQuery = new OSQLSynchQuery<ODocument>(selectQuery.toString()); + return dbQuery; + } + +} http://git-wip-us.apache.org/repos/asf/gora/blob/71a95b98/gora-orientdb/src/main/java/org/apache/gora/orientdb/query/OrientDBResult.java ---------------------------------------------------------------------- diff --git a/gora-orientdb/src/main/java/org/apache/gora/orientdb/query/OrientDBResult.java b/gora-orientdb/src/main/java/org/apache/gora/orientdb/query/OrientDBResult.java new file mode 100644 index 0000000..23df5fa --- /dev/null +++ b/gora-orientdb/src/main/java/org/apache/gora/orientdb/query/OrientDBResult.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.orientdb.query; + +import java.io.IOException; +import java.util.Iterator; + +import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx; +import com.orientechnologies.orient.core.record.impl.ODocument; +import com.orientechnologies.orient.core.sql.query.OConcurrentResultSet; +import org.apache.gora.orientdb.store.OrientDBStore; +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.Query; +import org.apache.gora.query.impl.ResultBase; +import org.apache.gora.store.DataStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * OrientDB specific implementation of the {@link org.apache.gora.query.Result} interface. + * + */ +public class OrientDBResult<K, T extends PersistentBase> extends ResultBase<K, T> { + + /** + * Reference to the OrientDB Results set. + */ + private OConcurrentResultSet<ODocument> resultSet; + private int size; + private static final Logger log = LoggerFactory.getLogger(OrientDBResult.class); + private Iterator<ODocument> resultSetIterator; + + public OrientDBResult(DataStore<K, T> dataStore, Query<K, T> query) { + super(dataStore, query); + } + + public OrientDBResult(DataStore<K, T> dataStore, + Query<K, T> query, + OConcurrentResultSet<ODocument> resultSet) { + super(dataStore, query); + this.resultSet = resultSet; + this.size = resultSet.size(); + this.resultSetIterator = resultSet.iterator(); + } + + public OrientDBStore<K, T> getDataStore() { + return (OrientDBStore<K, T>) super.getDataStore(); + } + + @Override + public float getProgress() throws IOException { + if (resultSet == null) { + return 0; + } else if (size == 0) { + return 1; + } else { + return offset / (float) size; + } + } + + @Override + public void close() throws IOException { + resultSet.clear(); + } + + @Override + protected boolean nextInner() throws IOException { + ODatabaseDocumentTx loadTx = ((OrientDBStore<K, T>) getDataStore()) + .getConnectionPool().acquire(); + loadTx.activateOnCurrentThread(); + try { + + if (!resultSetIterator.hasNext()) { + return false; + } + + ODocument obj = resultSetIterator.next(); + key = (K) obj.field("_id"); + persistent = ((OrientDBStore<K, T>) getDataStore()) + .convertOrientDocToAvroBean(obj, getQuery().getFields()); + return persistent != null; + } finally { + loadTx.close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/gora/blob/71a95b98/gora-orientdb/src/main/java/org/apache/gora/orientdb/query/package-info.java ---------------------------------------------------------------------- diff --git a/gora-orientdb/src/main/java/org/apache/gora/orientdb/query/package-info.java b/gora-orientdb/src/main/java/org/apache/gora/orientdb/query/package-info.java new file mode 100644 index 0000000..0812a12 --- /dev/null +++ b/gora-orientdb/src/main/java/org/apache/gora/orientdb/query/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains all the OrientDB dataStore query representation class as well as Result set representing class + * when query is executed over the OrientDB dataStore. + */ +package org.apache.gora.orientdb.query; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/71a95b98/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBMapping.java ---------------------------------------------------------------------- diff --git a/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBMapping.java b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBMapping.java new file mode 100644 index 0000000..a37c3d8 --- /dev/null +++ b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBMapping.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.orientdb.store; + +import com.orientechnologies.orient.core.metadata.schema.OType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.orientechnologies.orient.core.metadata.schema.OSchemaShared; +import com.orientechnologies.orient.core.record.impl.ODocument; +import org.apache.avro.specific.SpecificRecord; + +import java.util.HashMap; +import java.util.Locale; + +/** + * Maintains mapping between AVRO data bean and OrientDB document. + */ +public class OrientDBMapping { + + public static final Logger log = LoggerFactory.getLogger(OrientDBMapping.class); + + private String documentClass; + private HashMap<String, String> classToDocument = new HashMap<>(); + private HashMap<String, String> documentToClass = new HashMap<>(); + private HashMap<String, DocumentFieldType> documentFields = new HashMap<>(); + + /** + * Returns main OrientDB document class which matches to persistent bean. + * + * @return a {@link ODocument} class name. + */ + public String getDocumentClass() { + return documentClass; + } + + /** + * Setter for main OrientDB document class which matches for persistent bean. + * + * @param documentClass {@link ODocument} class name. + */ + public void setDocumentClass(String documentClass) { + this.documentClass = documentClass; + } + + /** + * Register mapping {@link com.orientechnologies.orient.core.record.impl.ODocument} + * field name to it s data type {@link OType} + * + * @param name {@link ODocument} class name. + * @param type {@link DocumentFieldType} field data type. + */ + private void registerDocumentField(String name, DocumentFieldType type) { + if (OSchemaShared.checkClassNameIfValid(name) != null) { + throw new IllegalArgumentException("'" + name + + "' is an invalid field name for a OrientDB document. '" + + OSchemaShared.checkClassNameIfValid(name) + "' is not a valid character."); + } + if (documentFields.containsKey(name) && (documentFields.get(name) != type)) + throw new IllegalStateException("The field '" + name + "' is already " + + "registered with a different type."); + documentFields.put(name, type); + } + + /** + * Register mapping between AVRO {@link SpecificRecord} + * record field, ODocument field and it's type. + * + * @param classFieldName {@link SpecificRecord} field name. + * @param docFieldName {@link ODocument} field name. + * @param fieldType {@link DocumentFieldType} field data type as string. + */ + public void registerClassField(String classFieldName, + String docFieldName, String fieldType) { + try { + registerDocumentField(docFieldName, + DocumentFieldType.valueOf(fieldType.toUpperCase(Locale.getDefault()))); + } catch (final IllegalArgumentException e) { + throw new IllegalStateException("Declared '" + fieldType + + "' for class field '" + classFieldName + + "' is not supported by OrientDBMapping"); + } + + if (classToDocument.containsKey(classFieldName)) { + if (!classToDocument.get(classFieldName).equals(docFieldName)) { + throw new IllegalStateException("The class field '" + classFieldName + + "' is already registered in the mapping" + + " with the document field '" + + classToDocument.get(classFieldName) + + " which differs from the new one '" + docFieldName + "'."); + } + } else { + classToDocument.put(classFieldName, docFieldName); + documentToClass.put(docFieldName, classFieldName); + } + } + + /** + * Returns all fields in AVRO {@link org.apache.hadoop.io.serializer.avro.Record} record. + * + * @return array of fields in string. + */ + public String[] getDocumentFields() { + return documentToClass.keySet().toArray(new String[documentToClass.keySet().size()]); + } + + /** + * Return ODocument name given it's mapped AVRO {@link SpecificRecord} + * record field name. + * + * @param field AVRO record field name in string + * @return matching ODocument {@link ODocument} field name in string. + */ + public String getDocumentField(String field) { + return classToDocument.get(field); + } + + /** + * Return ODocument name given it's mapped AVRO {@link SpecificRecord} + * record field name. + * + * @param field AVRO record field name in string + * @return matching ODocument {@link ODocument} field name in string. + */ + protected DocumentFieldType getDocumentFieldType(String field) { + return documentFields.get(field); + } + + /** + * Currently supporting data types from OrientDB data types {@link OType} + */ + public static enum DocumentFieldType { + + INTEGER(OType.INTEGER.name()), + LONG(OType.LONG.name()), + FLOAT(OType.FLOAT.name()), + SHORT(OType.SHORT.name()), + DOUBLE(OType.DOUBLE.name()), + STRING(OType.STRING.name()), + ANY(OType.ANY.name()), + TRANSIENT(OType.TRANSIENT.name()), + BINARY(OType.BINARY.name()), + DATE(OType.DATE.name()), + DATETIME(OType.DATETIME.name()), + EMBEDDED(OType.EMBEDDED.name()), + EMBEDDEDLIST(OType.EMBEDDEDLIST.name()), + EMBEDDEDSET(OType.EMBEDDEDSET.name()), + EMBEDDEDMAP(OType.EMBEDDEDMAP.name()); + + private final String stringValue; + + DocumentFieldType(final String s) { + stringValue = s; + } + + public String toString() { + return stringValue; + } + } + +} http://git-wip-us.apache.org/repos/asf/gora/blob/71a95b98/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBMappingBuilder.java ---------------------------------------------------------------------- diff --git a/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBMappingBuilder.java b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBMappingBuilder.java new file mode 100644 index 0000000..4c2d68c --- /dev/null +++ b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBMappingBuilder.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.orientdb.store; + +import org.apache.gora.persistency.impl.PersistentBase; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +import org.jdom.Document; +import org.jdom.Element; +import org.jdom.input.SAXBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility builder for create OrientDB mapping from gora-orientdb-mapping.xml. + */ +public class OrientDBMappingBuilder<K, T extends PersistentBase> { + + public static final String ATT_NAME = "name"; + public static final String ATT_TYPE = "type"; + public static final String TAG_CLASS = "class"; + public static final String ATT_KEYCLASS = "keyClass"; + public static final String ATT_DOCUMENT = "document"; + public static final String TAG_FIELD = "field"; + public static final String ATT_FIELD = "docfield"; + public static final Logger log = LoggerFactory.getLogger(OrientDBMapping.class); + + private final OrientDBStore<K, T> dataStore; + + private OrientDBMapping mapping; + + public OrientDBMappingBuilder(final OrientDBStore<K, T> store) { + this.dataStore = store; + this.mapping = new OrientDBMapping(); + } + + /** + * Build OrientDB dataStore mapping from gora-orientdb-mapping.xml given from class path + * or file system location. + */ + public OrientDBMapping build() { + if (mapping.getDocumentClass() == null) + throw new IllegalStateException("Document Class is not specified."); + return mapping; + } + + protected OrientDBMappingBuilder fromFile(String uri) throws IOException { + try { + SAXBuilder saxBuilder = new SAXBuilder(); + InputStream is = getClass().getResourceAsStream(uri); + if (is == null) { + String msg = "Unable to load the mapping from classpath resource '" + uri + + "' Re-trying local from local file system location."; + log.warn(msg); + is = new FileInputStream(uri); + } + Document doc = saxBuilder.build(is); + Element root = doc.getRootElement(); + List<Element> classElements = root.getChildren(TAG_CLASS); + for (Element classElement : classElements) { + final Class<T> persistentClass = dataStore.getPersistentClass(); + final Class<K> keyClass = dataStore.getKeyClass(); + if (matchesKeyClassWithMapping(keyClass, classElement) + && matchesPersistentClassWithMapping(persistentClass, classElement)) { + loadPersistentClass(classElement, persistentClass); + break; + } + } + } catch (IOException ex) { + throw ex; + } catch (Exception ex) { + throw new IOException(ex); + } + return this; + } + + private boolean matchesPersistentClassWithMapping(final Class<T> persistentClass, + final Element classElement) { + return classElement.getAttributeValue(ATT_NAME).equals(persistentClass.getName()); + } + + private boolean matchesKeyClassWithMapping(final Class<K> keyClass, + final Element classElement) { + return classElement.getAttributeValue(ATT_KEYCLASS).equals(keyClass.getName()); + } + + private void loadPersistentClass(Element classElement, + Class<T> pPersistentClass) { + + String docClassFromMapping = classElement.getAttributeValue(ATT_DOCUMENT); + String resolvedDocClass = dataStore.getSchemaName(docClassFromMapping, + pPersistentClass); + mapping.setDocumentClass(resolvedDocClass); + + List<Element> fields = classElement.getChildren(TAG_FIELD); + for (Element field : fields) { + mapping.registerClassField(field.getAttributeValue(ATT_NAME), + field.getAttributeValue(ATT_FIELD), + field.getAttributeValue(ATT_TYPE)); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/71a95b98/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStore.java ---------------------------------------------------------------------- diff --git a/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStore.java b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStore.java new file mode 100644 index 0000000..d0b901f --- /dev/null +++ b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStore.java @@ -0,0 +1,922 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.orientdb.store; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Properties; +import java.util.List; +import java.util.HashMap; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.Date; +import java.util.Calendar; +import java.util.Collection; +import java.util.TimeZone; +import java.util.Locale; + +import com.github.raymanrt.orientqb.query.Parameter; +import com.gitub.raymanrt.orientqb.delete.Delete; +import com.orientechnologies.orient.client.remote.OServerAdmin; +import com.orientechnologies.orient.core.db.OPartitionedDatabasePool; +import com.orientechnologies.orient.core.db.OPartitionedDatabasePoolFactory; +import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx; +import com.orientechnologies.orient.core.db.record.OTrackedList; +import com.orientechnologies.orient.core.db.record.OTrackedMap; +import com.orientechnologies.orient.core.db.record.OTrackedSet; +import com.orientechnologies.orient.core.metadata.schema.OClass; +import com.orientechnologies.orient.core.metadata.schema.OType; +import com.orientechnologies.orient.core.record.impl.ODocument; +import com.orientechnologies.orient.core.sql.OCommandSQL; +import com.orientechnologies.orient.core.sql.query.OConcurrentResultSet; +import com.orientechnologies.orient.core.sql.query.OSQLSynchQuery; +import org.apache.avro.Schema; +import org.apache.avro.util.Utf8; +import org.apache.gora.orientdb.query.OrientDBQuery; +import org.apache.gora.orientdb.query.OrientDBResult; +import org.apache.gora.persistency.impl.BeanFactoryImpl; +import org.apache.gora.persistency.impl.DirtyListWrapper; +import org.apache.gora.persistency.impl.DirtyMapWrapper; +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.PartitionQuery; +import org.apache.gora.query.Query; +import org.apache.gora.query.Result; +import org.apache.gora.query.impl.PartitionQueryImpl; +import org.apache.gora.store.impl.DataStoreBase; +import org.apache.gora.util.AvroUtils; +import org.apache.gora.util.ClassLoadingUtils; + +import javax.xml.bind.DatatypeConverter; + +import static com.github.raymanrt.orientqb.query.Projection.projection; + +/** + * {@link org.apache.gora.orientdb.store.OrientDBStore} is the primary class + * responsible for facilitating GORA CRUD operations on OrientDB documents. + */ +public class OrientDBStore<K, T extends PersistentBase> extends DataStoreBase<K, T> { + + public static final String DEFAULT_MAPPING_FILE = "/gora-orientdb-mapping.xml"; + private String ROOT_URL; + private String ROOT_DATABASE_URL; + private OrientDBStoreParameters orientDbStoreParams; + private OrientDBMapping orientDBMapping; + private OServerAdmin remoteServerAdmin; + private OPartitionedDatabasePool connectionPool; + private List<ODocument> docBatch = new ArrayList<>(); + + /** + * Initialize the OrientDB dataStore by {@link Properties} parameters. + * + * @param keyClass key class type for dataStore. + * @param persistentClass persistent class type for dataStore. + * @param properties OrientDB dataStore properties EG:- OrientDB client credentials. + */ + @Override + public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) { + super.initialize(keyClass, persistentClass, properties); + try { + orientDbStoreParams = OrientDBStoreParameters.load(properties); + ROOT_URL = "remote:".concat(orientDbStoreParams.getServerHost()).concat(":") + .concat(orientDbStoreParams.getServerPort()); + ROOT_DATABASE_URL = ROOT_URL.concat("/").concat(orientDbStoreParams.getDatabaseName()); + remoteServerAdmin = new OServerAdmin(ROOT_URL).connect(orientDbStoreParams.getUserName(), + orientDbStoreParams.getUserPassword()); + if (!remoteServerAdmin.existsDatabase(orientDbStoreParams.getDatabaseName(), "memory")) { + remoteServerAdmin.createDatabase(orientDbStoreParams.getDatabaseName(), "document", "memory"); + } + + if (orientDbStoreParams.getConnectionPoolSize() != null) { + int connPoolSize = Integer.valueOf(orientDbStoreParams.getConnectionPoolSize()); + connectionPool = new OPartitionedDatabasePoolFactory(connPoolSize) + .get(ROOT_DATABASE_URL, orientDbStoreParams.getUserName(), + orientDbStoreParams.getUserPassword()); + } else { + connectionPool = new OPartitionedDatabasePoolFactory().get(ROOT_DATABASE_URL, + orientDbStoreParams.getUserName(), orientDbStoreParams.getUserPassword()); + } + + OrientDBMappingBuilder<K, T> builder = new OrientDBMappingBuilder<>(this); + orientDBMapping = builder.fromFile(orientDbStoreParams.getMappingFile()).build(); + + if (!schemaExists()) { + createSchema(); + } + } catch (Exception e) { + LOG.error("Error while initializing OrientDB dataStore: {}", + new Object[]{e.getMessage()}); + throw new RuntimeException(e); + } + } + + @Override + public String getSchemaName(final String mappingSchemaName, + final Class<?> persistentClass) { + return super.getSchemaName(mappingSchemaName, persistentClass); + } + + @Override + public String getSchemaName() { + return orientDBMapping.getDocumentClass(); + } + + /** + * Create a new class of OrientDB documents if necessary. Enforce specified schema over the document class. + * + */ + @Override + public void createSchema() { + if (schemaExists()) { + return; + } + + ODatabaseDocumentTx schemaTx = connectionPool.acquire(); + schemaTx.activateOnCurrentThread(); + try { + + OClass documentClass = schemaTx.getMetadata().getSchema().createClass(orientDBMapping.getDocumentClass()); + documentClass.createProperty("_id", + OType.getTypeByClass(super.getKeyClass())).createIndex(OClass.INDEX_TYPE.UNIQUE); + for (String docField : orientDBMapping.getDocumentFields()) { + documentClass.createProperty(docField, + OType.valueOf(orientDBMapping.getDocumentFieldType(docField).name())); + } + schemaTx.getMetadata().getSchema().reload(); + } finally { + schemaTx.close(); + } + } + + /** + * Deletes enforced schema over OrientDB Document class. + * + */ + @Override + public void deleteSchema() { + ODatabaseDocumentTx schemaTx = connectionPool.acquire(); + schemaTx.activateOnCurrentThread(); + try { + schemaTx.getMetadata().getSchema().dropClass(orientDBMapping.getDocumentClass()); + } finally { + schemaTx.close(); + } + } + + /** + * Check whether there exist a schema enforced over OrientDB document class. + * + */ + @Override + public boolean schemaExists() { + ODatabaseDocumentTx schemaTx = connectionPool.acquire(); + schemaTx.activateOnCurrentThread(); + try { + return schemaTx.getMetadata().getSchema() + .existsClass(orientDBMapping.getDocumentClass()); + } finally { + schemaTx.close(); + } + } + + @Override + public T get(K key, String[] fields) { + String[] dbFields = getFieldsToQuery(fields); + com.github.raymanrt.orientqb.query.Query selectQuery = new com.github.raymanrt.orientqb.query.Query(); + for (String k : dbFields) { + String dbFieldName = orientDBMapping.getDocumentField(k); + if (dbFieldName != null && dbFieldName.length() > 0) { + selectQuery.select(dbFieldName); + } + } + selectQuery.from(orientDBMapping.getDocumentClass()) + .where(projection("_id").eq(Parameter.parameter("key"))); + Map<String, Object> params = new HashMap<String, Object>(); + params.put("key", key); + OSQLSynchQuery<ODocument> query = new OSQLSynchQuery<ODocument>(selectQuery.toString()); + ODatabaseDocumentTx selectTx = connectionPool.acquire(); + selectTx.activateOnCurrentThread(); + try { + List<ODocument> result = selectTx.command(query).execute(params); + if (result.size() == 1) { + return convertOrientDocToAvroBean(result.get(0), dbFields); + } else { + return null; + } + } finally { + selectTx.close(); + } + } + + @Override + public void put(K key, T val) { + if (val.isDirty()) { + OrientDBQuery<K, T> dataStoreQuery = new OrientDBQuery<>(this); + dataStoreQuery.setStartKey(key); + dataStoreQuery.setEndKey(key); + dataStoreQuery.populateOrientDBQuery(orientDBMapping, getFieldsToQuery(null), getFields()); + + ODatabaseDocumentTx selectTx = connectionPool.acquire(); + selectTx.activateOnCurrentThread(); + try { + List<ODocument> result = selectTx.command(dataStoreQuery.getOrientDBQuery()) + .execute(dataStoreQuery.getParams()); + if (result.size() == 1) { + ODocument document = updateOrientDocFromAvroBean(key, val, result.get(0)); + docBatch.add(document); + } else { + ODocument document = convertAvroBeanToOrientDoc(key, val); + docBatch.add(document); + } + } finally { + selectTx.close(); + } + } else { + LOG.info("Ignored putting persistent bean {} in the store as it is neither " + + "new, neither dirty.", new Object[]{val}); + } + } + + @Override + public boolean delete(K key) { + Delete delete = new Delete(); + delete.from(orientDBMapping.getDocumentClass()) + .where(projection("_id").eq(Parameter.parameter("key"))); + Map<String, Object> params = new HashMap<String, Object>(); + params.put("key", key); + OCommandSQL query = new OCommandSQL(delete.toString().replace("DELETE", "DELETE FROM")); + ODatabaseDocumentTx deleteTx = connectionPool.acquire(); + deleteTx.activateOnCurrentThread(); + try { + int deleteCount = deleteTx.command(query).execute(params); + if (deleteCount == 1) { + return true; + } else { + return false; + } + } finally { + deleteTx.close(); + } + } + + @Override + public long deleteByQuery(Query<K, T> query) { + Delete delete = new Delete(); + delete.from(orientDBMapping.getDocumentClass()); + Map<String, Object> params = new HashMap<String, Object>(); + if (query.getFields() == null || (query.getFields().length == getFields().length)) { + if (query.getStartKey() != null) { + delete.where(projection("_id").ge(Parameter.parameter("start"))); + params.put("start", query.getStartKey()); + } + if (query.getEndKey() != null) { + delete.where(projection("_id").le(Parameter.parameter("end"))); + params.put("end", query.getEndKey()); + } + + OCommandSQL dbQuery = new OCommandSQL(delete.toString().replace("DELETE", "DELETE FROM")); + ODatabaseDocumentTx deleteTx = connectionPool.acquire(); + deleteTx.activateOnCurrentThread(); + try { + int deleteCount; + if (params.isEmpty()) { + deleteCount = deleteTx.command(dbQuery).execute(); + } else { + deleteCount = deleteTx.command(dbQuery).execute(params); + } + if (deleteCount > 0) { + return deleteCount; + } else { + return 0; + } + } finally { + deleteTx.close(); + } + } else { + + OrientDBQuery<K, T> dataStoreQuery = new OrientDBQuery<>(this); + dataStoreQuery.setStartKey(query.getStartKey()); + dataStoreQuery.setEndKey(query.getEndKey()); + dataStoreQuery.populateOrientDBQuery(orientDBMapping, getFieldsToQuery(null), getFields()); + + ODatabaseDocumentTx selectTx = connectionPool.acquire(); + selectTx.activateOnCurrentThread(); + try { + List<ODocument> result = selectTx.command(dataStoreQuery.getOrientDBQuery()) + .execute(dataStoreQuery.getParams()); + if (result != null && result.isEmpty()) { + return 0; + } else { + for (ODocument doc : result) { + for (String docField : query.getFields()) { + if (doc.containsField(orientDBMapping.getDocumentField(docField))) { + doc.removeField(orientDBMapping.getDocumentField(docField)); + } + } + doc.save(); + } + return result.size(); + } + } finally { + selectTx.close(); + } + } + } + + @Override + public Result<K, T> execute(Query<K, T> query) { + String[] fields = getFieldsToQuery(query.getFields()); + OrientDBQuery dataStoreQuery; + if (query instanceof OrientDBQuery) { + dataStoreQuery = ((OrientDBQuery) query); + } else { + dataStoreQuery = (OrientDBQuery) ((PartitionQueryImpl<K, T>) query).getBaseQuery(); + } + dataStoreQuery.populateOrientDBQuery(orientDBMapping, fields, getFields()); + ODatabaseDocumentTx selectTx = connectionPool.acquire(); + selectTx.activateOnCurrentThread(); + try { + OConcurrentResultSet<ODocument> result = selectTx.command(dataStoreQuery.getOrientDBQuery()) + .execute(dataStoreQuery.getParams()); + result.setLimit((int) query.getLimit()); + return new OrientDBResult<K, T>(this, query, result); + } finally { + selectTx.close(); + } + } + + @Override + public Query<K, T> newQuery() { + OrientDBQuery<K, T> query = new OrientDBQuery<K, T>(this); + query.setFields(getFieldsToQuery(null)); + return new OrientDBQuery<K, T>(this); + } + + @Override + public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws IOException { + // TODO : Improve code on OrientDB clusters + List<PartitionQuery<K, T>> partitions = new ArrayList<>(); + PartitionQueryImpl<K, T> partitionQuery = new PartitionQueryImpl<>( + query); + partitionQuery.setConf(this.getConf()); + partitions.add(partitionQuery); + return partitions; + } + + /** + * Flushes locally cached to content in memory to remote OrientDB server. + * + */ + @Override + public void flush() { + ODatabaseDocumentTx updateTx = connectionPool.acquire(); + updateTx.activateOnCurrentThread(); + try { + for (ODocument document : docBatch) { + updateTx.save(document); + } + } finally { + updateTx.close(); + docBatch.clear(); + } + } + + /** + * Releases resources which have been used dataStore. Eg:- OrientDB Client connection pool. + * + */ + @Override + public void close() { + docBatch.clear(); + remoteServerAdmin.close(); + connectionPool.close(); + } + + /** + * Returns OrientDB client connection pool maintained at Gora dataStore. + * + * @return {@link OPartitionedDatabasePool} OrientDB client connection pool. + */ + public OPartitionedDatabasePool getConnectionPool() { + return connectionPool; + } + + public T convertOrientDocToAvroBean(final ODocument obj, final String[] fields) { + T persistent = newPersistent(); + String[] dbFields = getFieldsToQuery(fields); + for (String f : dbFields) { + String docf = orientDBMapping.getDocumentField(f); + if (docf == null || !obj.containsField(docf)) + continue; + + OrientDBMapping.DocumentFieldType storeType = orientDBMapping.getDocumentFieldType(docf); + Schema.Field field = fieldMap.get(f); + Schema fieldSchema = field.schema(); + + LOG.debug("Load from ODocument, field:{}, schemaType:{}, docField:{}, storeType:{}", + new Object[]{field.name(), fieldSchema.getType(), docf, storeType}); + Object result = convertDocFieldToAvroField(fieldSchema, storeType, field, docf, obj); + persistent.put(field.pos(), result); + } + persistent.clearDirty(); + return persistent; + } + + private Object convertDocFieldToAvroField(final Schema fieldSchema, + final OrientDBMapping.DocumentFieldType storeType, + final Schema.Field field, + final String docf, + final ODocument obj) { + Object result = null; + switch (fieldSchema.getType()) { + case MAP: + result = convertDocFieldToAvroMap(docf, fieldSchema, obj, field, storeType); + break; + case ARRAY: + result = convertDocFieldToAvroList(docf, fieldSchema, obj, field, storeType); + break; + case RECORD: + ODocument record = obj.field(docf); + if (record == null) { + result = null; + break; + } + result = convertAvroBeanToOrientDoc(fieldSchema, record); + break; + case BOOLEAN: + result = OType.convert(obj.field(docf), Boolean.class); + break; + case DOUBLE: + result = OType.convert(obj.field(docf), Double.class); + break; + case FLOAT: + result = OType.convert(obj.field(docf), Float.class); + break; + case INT: + result = OType.convert(obj.field(docf), Integer.class); + break; + case LONG: + result = OType.convert(obj.field(docf), Long.class); + break; + case STRING: + result = convertDocFieldToAvroString(storeType, docf, obj); + break; + case ENUM: + result = AvroUtils.getEnumValue(fieldSchema, obj.field(docf)); + break; + case BYTES: + case FIXED: + if (obj.field(docf) == null) { + result = null; + break; + } + result = ByteBuffer.wrap((byte[]) obj.field(docf)); + break; + case NULL: + result = null; + break; + case UNION: + result = convertDocFieldToAvroUnion(fieldSchema, storeType, field, docf, obj); + break; + default: + LOG.warn("Unable to read {}", docf); + break; + } + return result; + } + + private Object convertDocFieldToAvroList(final String docf, + final Schema fieldSchema, + final ODocument doc, + final Schema.Field f, + final OrientDBMapping.DocumentFieldType storeType) { + if (storeType == OrientDBMapping.DocumentFieldType.EMBEDDEDSET) { + OTrackedSet<Object> set = doc.field(docf); + List<Object> rlist = new ArrayList<>(); + if (set == null) { + return new DirtyListWrapper(rlist); + } + + for (Object item : set) { + Object o = convertDocFieldToAvroField(fieldSchema.getElementType(), storeType, f, + "item", new ODocument("item", item)); + rlist.add(o); + } + return new DirtyListWrapper<>(rlist); + + } else { + OTrackedList<Object> list = doc.field(docf); + List<Object> rlist = new ArrayList<>(); + if (list == null) { + return new DirtyListWrapper(rlist); + } + + for (Object item : list) { + Object o = convertDocFieldToAvroField(fieldSchema.getElementType(), storeType, f, + "item", new ODocument("item", item)); + rlist.add(o); + } + return new DirtyListWrapper<>(rlist); + } + } + + private Object convertAvroListToDocField(final String docf, final Collection<?> array, + final Schema fieldSchema, final Schema.Type fieldType, + final OrientDBMapping.DocumentFieldType storeType) { + if (storeType == OrientDBMapping.DocumentFieldType.EMBEDDEDLIST) { + ArrayList list; + list = new ArrayList<Object>(); + if (array == null) + return list; + for (Object item : array) { + OrientDBMapping.DocumentFieldType fieldStoreType = orientDBMapping.getDocumentFieldType(docf); + Object result = convertAvroFieldToOrientField(docf, fieldSchema, fieldType, fieldStoreType, item); + list.add(result); + } + return list; + } else if (storeType == OrientDBMapping.DocumentFieldType.EMBEDDEDSET) { + HashSet set; + set = new HashSet<Object>(); + if (array == null) + return set; + for (Object item : array) { + OrientDBMapping.DocumentFieldType fieldStoreType = orientDBMapping.getDocumentFieldType(docf); + Object result = convertAvroFieldToOrientField(docf, fieldSchema, fieldType, fieldStoreType, item); + set.add(result); + } + return set; + } + return null; + } + + private Object convertDocFieldToAvroMap(final String docf, final Schema fieldSchema, + final ODocument doc, final Schema.Field f, + final OrientDBMapping.DocumentFieldType storeType) { + if (storeType == OrientDBMapping.DocumentFieldType.EMBEDDEDMAP) { + OTrackedMap<Object> map = doc.field(docf); + Map<Utf8, Object> rmap = new HashMap<>(); + if (map == null) { + return new DirtyMapWrapper(rmap); + } + + for (Map.Entry entry : map.entrySet()) { + String mapKey = decodeFieldKey((String) entry.getKey()); + Object o = convertDocFieldToAvroField(fieldSchema.getValueType(), storeType, f, mapKey, + decorateOTrackedMapToODoc(map)); + rmap.put(new Utf8(mapKey), o); + } + return new DirtyMapWrapper<>(rmap); + } else { + ODocument innerDoc = doc.field(docf); + Map<Utf8, Object> rmap = new HashMap<>(); + if (innerDoc == null) { + return new DirtyMapWrapper(rmap); + } + + for (String fieldName : innerDoc.fieldNames()) { + String mapKey = decodeFieldKey(fieldName); + Object o = convertDocFieldToAvroField(fieldSchema.getValueType(), storeType, f, mapKey, + innerDoc); + rmap.put(new Utf8(mapKey), o); + } + return new DirtyMapWrapper<>(rmap); + } + } + + private ODocument decorateOTrackedMapToODoc(OTrackedMap<Object> map) { + ODocument doc = new ODocument(); + for (Map.Entry entry : map.entrySet()) { + doc.field((String) entry.getKey(), entry.getValue()); + } + return doc; + } + + private Object convertAvroMapToDocField(final String docf, + final Map<CharSequence, ?> value, final Schema fieldSchema, + final Schema.Type fieldType, + final OrientDBMapping.DocumentFieldType storeType) { + if (storeType == OrientDBMapping.DocumentFieldType.EMBEDDEDMAP) { + HashMap map = new HashMap<String, Object>(); + if (value == null) + return map; + + for (Map.Entry<CharSequence, ?> e : value.entrySet()) { + String mapKey = encodeFieldKey(e.getKey().toString()); + Object mapValue = e.getValue(); + + OrientDBMapping.DocumentFieldType fieldStoreType = orientDBMapping.getDocumentFieldType(docf); + Object result = convertAvroFieldToOrientField(docf, fieldSchema, fieldType, fieldStoreType, + mapValue); + map.put(mapKey, result); + } + return map; + } else { + ODocument doc = new ODocument(); + if (value == null) + return doc; + for (Map.Entry<CharSequence, ?> e : value.entrySet()) { + String mapKey = encodeFieldKey(e.getKey().toString()); + Object mapValue = e.getValue(); + + OrientDBMapping.DocumentFieldType fieldStoreType = orientDBMapping.getDocumentFieldType(docf); + Object result = convertAvroFieldToOrientField(docf, fieldSchema, fieldType, fieldStoreType, + mapValue); + doc.field(mapKey, result); + } + return doc; + } + } + + private Object convertAvroBeanToOrientDoc(final Schema fieldSchema, + final ODocument doc) { + Object result; + Class<?> clazz = null; + try { + clazz = ClassLoadingUtils.loadClass(fieldSchema.getFullName()); + } catch (ClassNotFoundException e) { + //Ignore + } + PersistentBase record = (PersistentBase) new BeanFactoryImpl(keyClass, clazz).newPersistent(); + for (Schema.Field recField : fieldSchema.getFields()) { + Schema innerSchema = recField.schema(); + OrientDBMapping.DocumentFieldType innerStoreType = orientDBMapping + .getDocumentFieldType(recField.name()); + String innerDocField = orientDBMapping.getDocumentField(recField.name()) != null ? orientDBMapping + .getDocumentField(recField.name()) : recField.name(); + LOG.debug("Load from ODocument (RECORD), field:{}, schemaType:{}, docField:{}, storeType:{}", + new Object[]{recField.name(), innerSchema.getType(), innerDocField, + innerStoreType}); + record.put(recField.pos(), + convertDocFieldToAvroField(innerSchema, innerStoreType, recField, innerDocField, + doc)); + } + result = record; + return result; + } + + private Object convertDocFieldToAvroString(final OrientDBMapping.DocumentFieldType storeType, + final String docf, final ODocument doc) { + Object result; + if (storeType == OrientDBMapping.DocumentFieldType.DATE || + storeType == OrientDBMapping.DocumentFieldType.DATETIME) { + Date dateTime = doc.field(docf); + Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"), Locale.getDefault()); + calendar.setTime(dateTime); + result = new Utf8(DatatypeConverter.printDateTime(calendar)); + } else { + result = new Utf8((String) doc.field(encodeFieldKey(docf))); + } + return result; + } + + private Object convertDocFieldToAvroUnion(final Schema fieldSchema, + final OrientDBMapping.DocumentFieldType storeType, + final Schema.Field field, + final String docf, + final ODocument doc) { + Object result; + Schema.Type type0 = fieldSchema.getTypes().get(0).getType(); + Schema.Type type1 = fieldSchema.getTypes().get(1).getType(); + + if (!type0.equals(type1) + && (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL))) { + Schema innerSchema = null; + if (type0.equals(Schema.Type.NULL)) { + innerSchema = fieldSchema.getTypes().get(1); + } else { + innerSchema = fieldSchema.getTypes().get(0); + } + + LOG.debug("Load from ODocument (UNION), schemaType:{}, docField:{}, storeType:{}", + new Object[]{innerSchema.getType(), docf, storeType}); + + result = convertDocFieldToAvroField(innerSchema, storeType, field, docf, doc); + } else { + throw new IllegalStateException("OrientDBStore only supports Union of two types field."); + } + return result; + } + + private Object convertAvroUnionToOrientDBField(final String docf, final Schema fieldSchema, + final OrientDBMapping.DocumentFieldType storeType, + final Object value) { + Object result; + Schema.Type type0 = fieldSchema.getTypes().get(0).getType(); + Schema.Type type1 = fieldSchema.getTypes().get(1).getType(); + + if (!type0.equals(type1) + && (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL))) { + Schema innerSchema = null; + if (type0.equals(Schema.Type.NULL)) { + innerSchema = fieldSchema.getTypes().get(1); + } else { + innerSchema = fieldSchema.getTypes().get(0); + } + + LOG.debug("Transform value to ODocument (UNION), type:{}, storeType:{}", + new Object[]{innerSchema.getType(), type1, storeType}); + + result = convertAvroFieldToOrientField(docf, innerSchema, innerSchema.getType(), storeType, value); + } else { + throw new IllegalStateException("OrientDBStore only supports Union of two types field."); + } + return result; + } + + private ODocument convertAvroBeanToOrientDoc(final K key, final T persistent) { + ODocument result = new ODocument(orientDBMapping.getDocumentClass()); + for (Schema.Field f : persistent.getSchema().getFields()) { + if (persistent.isDirty(f.pos()) && (persistent.get(f.pos()) != null)) { + String docf = orientDBMapping.getDocumentField(f.name()); + Object value = persistent.get(f.pos()); + OrientDBMapping.DocumentFieldType storeType = orientDBMapping.getDocumentFieldType(docf); + LOG.debug("Transform value to ODocument, docField:{}, schemaType:{}, storeType:{}", + new Object[]{docf, f.schema().getType(), storeType}); + Object o = convertAvroFieldToOrientField(docf, f.schema(), f.schema().getType(), + storeType, value); + result.field(docf, o); + } + } + result.field("_id", key); + return result; + } + + private ODocument updateOrientDocFromAvroBean(final K key, final T persistent, final ODocument result) { + for (Schema.Field f : persistent.getSchema().getFields()) { + if (persistent.isDirty(f.pos()) /*&& (persistent.get(f.pos()) != null)*/) { + String docf = orientDBMapping.getDocumentField(f.name()); + if (persistent.get(f.pos()) == null) { + result.removeField(docf); + continue; + } + Object value = persistent.get(f.pos()); + OrientDBMapping.DocumentFieldType storeType = orientDBMapping.getDocumentFieldType(docf); + LOG.debug("Transform value to ODocument, docField:{}, schemaType:{}, storeType:{}", + new Object[]{docf, f.schema().getType(), storeType}); + Object o = convertAvroFieldToOrientField(docf, f.schema(), f.schema().getType(), + storeType, value); + result.field(docf, o); + } + } + return result; + } + + private Object convertAvroFieldToOrientField(final String docf, final Schema fieldSchema, + final Schema.Type fieldType, + final OrientDBMapping.DocumentFieldType storeType, + final Object value) { + Object result = null; + switch (fieldType) { + case MAP: + if (storeType != null && !(storeType == OrientDBMapping.DocumentFieldType.EMBEDDEDMAP || + storeType == OrientDBMapping.DocumentFieldType.EMBEDDED)) { + throw new IllegalStateException( + "Field " + fieldSchema.getName() + + ": to store a AVRO 'map', target OrientDB mapping have to be of type 'EmbeddedMap'" + + "| 'Embedded'"); + } + Schema valueSchema = fieldSchema.getValueType(); + result = convertAvroMapToDocField(docf, (Map<CharSequence, ?>) value, valueSchema, + valueSchema.getType(), storeType); + break; + case ARRAY: + if (storeType != null && !(storeType == OrientDBMapping.DocumentFieldType.EMBEDDEDLIST || + storeType == OrientDBMapping.DocumentFieldType.EMBEDDEDSET)) { + throw new IllegalStateException("Field " + fieldSchema.getName() + + ": To store a AVRO 'array', target Mongo mapping have to be of type 'EmbeddedMap'" + + "|'EmbeddedList'"); + } + Schema elementSchema = fieldSchema.getElementType(); + result = convertAvroListToDocField(docf, (List<?>) value, elementSchema, + elementSchema.getType(), storeType); + break; + case BYTES: + if (value != null) { + result = ((ByteBuffer) value).array(); + } + break; + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case BOOLEAN: + result = value; + break; + case STRING: + result = convertAvroStringToDocField(fieldSchema, storeType, value); + break; + case ENUM: + if (value != null) + result = value.toString(); + break; + case RECORD: + if (value == null) + break; + result = convertAvroBeanToOrientDoc(docf, fieldSchema, value); + break; + case UNION: + result = convertAvroUnionToOrientDBField(docf, fieldSchema, storeType, value); + break; + case FIXED: + result = value; + break; + default: + LOG.error("Unknown field type: {}", fieldSchema.getType()); + break; + } + return result; + } + + private Object convertAvroStringToDocField(final Schema fieldSchema, + final OrientDBMapping.DocumentFieldType storeType, + final Object value) { + Object result = null; + if (storeType == OrientDBMapping.DocumentFieldType.DATETIME) { + if (value != null) { + Calendar dateTime = null; + try { + dateTime = DatatypeConverter.parseDateTime(value.toString()); + } catch (IllegalArgumentException e) { + throw new IllegalStateException("Field " + fieldSchema.getType() + + ": Invalid date and time format '" + value + "'", e); + + } + result = dateTime.getTime(); + } + } else if (storeType == OrientDBMapping.DocumentFieldType.DATE) { + Calendar date = null; + try { + date = DatatypeConverter.parseDate(value.toString()); + } catch (IllegalArgumentException e) { + throw new IllegalStateException("Field " + fieldSchema.getType() + + ": Invalid date format '" + value + "'", e); + } + result = date.getTime(); + } else { + if (value != null) { + result = value.toString(); + } + } + return result; + } + + private ODocument convertAvroBeanToOrientDoc(final String docf, + final Schema fieldSchema, + final Object value) { + ODocument record = new ODocument(); + for (Schema.Field member : fieldSchema.getFields()) { + Object innerValue = ((PersistentBase) value).get(member.pos()); + String innerDoc = orientDBMapping.getDocumentField(member.name()); + Schema.Type innerType = member.schema().getType(); + OrientDBMapping.DocumentFieldType innerStoreType = orientDBMapping.getDocumentFieldType(innerDoc); + LOG.debug("Transform value to ODocument , docField:{}, schemaType:{}, storeType:{}", + new Object[]{member.name(), member.schema().getType(), + innerStoreType}); + Object fieldValue = convertAvroFieldToOrientField(docf, member.schema() + , innerType, innerStoreType, innerValue); + record.field(member.name(), fieldValue); + } + return record; + } + + private String encodeFieldKey(final String key) { + if (key == null) { + return null; + } + return key.replace(".", "\u00B7") + .replace(":", "\u00FF") + .replace(";", "\u00FE") + .replace(" ", "\u00FD") + .replace("%", "\u00FC") + .replace("=", "\u00FB"); + } + + private String decodeFieldKey(final String key) { + if (key == null) { + return null; + } + return key.replace("\u00B7", ".") + .replace("\u00FF", ":") + .replace("\u00FE", ";") + .replace("\u00FD", " ") + .replace("\u00FC", "%") + .replace("\u00FB", "="); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/71a95b98/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStoreParameters.java ---------------------------------------------------------------------- diff --git a/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStoreParameters.java b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStoreParameters.java new file mode 100644 index 0000000..f6e18bc --- /dev/null +++ b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStoreParameters.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.orientdb.store; + +import java.util.Properties; + +/** + * Maintains OrientDB client related properties parsed from gora.properties. + */ +public class OrientDBStoreParameters { + + public static final String ORIENT_DB_MAPPING_FILE = "gora.orientdb.mapping.file"; + public static final String ORIENT_DB_SERVER_HOST = "gora.orientdb.server.host"; + public static final String ORIENT_DB_SERVER_PORT = "gora.orientdb.server.port"; + public static final String ORIENT_DB_USER_USERNAME = "gora.orientdb.user.username"; + public static final String ORIENT_DB_USER_PASSWORD = "gora.orientdb.user.password"; + public static final String ORIENT_DB_DB_NAME = "gora.orientdb.database.name"; + public static final String ORIENT_DB_CONNECTION_POOL_SIZE = "gora.orientdb.con.pool.size"; + public static final String ORIENT_DB_STORAGE_TYPE = "gora.orientdb.storage.type"; + + + private String mappingFile; + private String serverHost; + private String serverPort; + private String userName; + private String userPassword; + private String databaseName; + private String connPoolSize; + private String storageType; + + + /** + * Return classpath or file system location for OrientDB mapping file. Eg:- /gora-orientdb-mapping.xml + * + * @return OrientDB Mapping file Location as string. + */ + public String getMappingFile() { + return this.mappingFile; + } + + /** + * Return remote OrientDB server host name. Eg:- localhost + * + * @return OrientDB remote server host as string. + */ + public String getServerHost() { + return this.serverHost; + } + + /** + * Return remote OrientDB server port number. Eg:- 2424 + * + * @return OrientDB remote server port number as string. + */ + public String getServerPort() { + return this.serverPort; + } + + /** + * Return remote OrientDB server client connecting user username. Eg:- admin + * + * @return OrientDB remote server client connecting user username as string. + */ + public String getUserName() { + return this.userName; + } + + /** + * Return remote OrientDB server client connecting user password. Eg:- admin + * + * @return OrientDB remote server client connecting user pass as string. + */ + public String getUserPassword() { + return this.userPassword; + } + + /** + * Return remote OrientDB server pointing database name. Eg:- gora + * + * @return OrientDB remote server pointing database name as string. + */ + public String getDatabaseName() { + return this.databaseName; + } + + /** + * Return remote OrientDB client connections pool size. Eg:- 80 + * + * @return OrientDB remote server client connections pool size as string. + */ + public String getConnectionPoolSize() { + return this.connPoolSize; + } + + /** + * Return remote OrientDB server storage type of pointing database. Eg:- plocal, memory + * + * @return OrientDB remote server storage type of pointing database. + */ + public String getStorageType() { + return this.storageType; + } + + public OrientDBStoreParameters(String mappingFile, + String serverHost, + String serverPort, + String userName, + String userPassword, + String databaseName, + String connPoolSize, + String storageType) { + this.mappingFile = mappingFile; + this.serverHost = serverHost; + this.serverPort = serverPort; + this.userName = userName; + this.userPassword = userPassword; + this.databaseName = databaseName; + this.connPoolSize = connPoolSize; + this.storageType = storageType; + } + + /** + * Extraction OrientDB dataStore properties from {@link Properties} gora.properties file. + * + * @return OrientDB client properties encapsulated inside instance of {@link OrientDBStoreParameters} + */ + public static OrientDBStoreParameters load(Properties properties) { + String propMappingFile = properties.getProperty(ORIENT_DB_MAPPING_FILE, + OrientDBStore.DEFAULT_MAPPING_FILE); + String propServerHost = properties.getProperty(ORIENT_DB_SERVER_HOST); + String propServerPort = properties.getProperty(ORIENT_DB_SERVER_PORT); + String propUserName = properties.getProperty(ORIENT_DB_USER_USERNAME); + String propUserPassword = properties.getProperty(ORIENT_DB_USER_PASSWORD); + String propDatabaseName = properties.getProperty(ORIENT_DB_DB_NAME); + String propConnPoolSize = properties.getProperty(ORIENT_DB_CONNECTION_POOL_SIZE); + String propStorageType = properties.getProperty(ORIENT_DB_STORAGE_TYPE); + return new OrientDBStoreParameters(propMappingFile, + propServerHost, propServerPort, propUserName, + propUserPassword, propDatabaseName, propConnPoolSize, propStorageType); + } +} http://git-wip-us.apache.org/repos/asf/gora/blob/71a95b98/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/package-info.java ---------------------------------------------------------------------- diff --git a/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/package-info.java b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/package-info.java new file mode 100644 index 0000000..60871b0 --- /dev/null +++ b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains main OrientDB dataStore class and dataStore mapping representing class, and + * utility classes to build dataStore specific mappings. + */ +package org.apache.gora.orientdb.store; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/71a95b98/gora-orientdb/src/test/java/org/apache/gora/orientdb/GoraOrientDBTestDriver.java ---------------------------------------------------------------------- diff --git a/gora-orientdb/src/test/java/org/apache/gora/orientdb/GoraOrientDBTestDriver.java b/gora-orientdb/src/test/java/org/apache/gora/orientdb/GoraOrientDBTestDriver.java new file mode 100644 index 0000000..a3f024c --- /dev/null +++ b/gora-orientdb/src/test/java/org/apache/gora/orientdb/GoraOrientDBTestDriver.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.orientdb; + +import com.orientechnologies.orient.server.OServer; +import com.orientechnologies.orient.server.OServerMain; +import org.apache.gora.GoraTestDriver; +import org.apache.gora.orientdb.store.OrientDBStore; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.store.DataStore; +import org.apache.gora.util.GoraException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Driver to set up an embedded OrientDB database instance for Gora + * dataStore specific integration tests. + */ +public class GoraOrientDBTestDriver extends GoraTestDriver { + + private static Logger log = LoggerFactory.getLogger(GoraOrientDBTestDriver.class); + private static final String SERVER_DIRECTORY = "./target/db"; + private static final String SERVER_CONFIGURATION = "/orientdb-server-config.xml"; + private OServer server; + + public GoraOrientDBTestDriver() { + super(OrientDBStore.class); + } + + /** + * Initialize embedded OrientDB server instance as per the gora-orientdb-mapping.xml + * server configuration file. + */ + @Override + public void setUpClass() throws Exception { + server = OServerMain.create(); + server.setServerRootDirectory(SERVER_DIRECTORY); + server.startup(getClass().getResourceAsStream(SERVER_CONFIGURATION)); + server.activate(); + log.info("OrientDB Embedded Server started successfully."); + } + + /** + * Terminate embedded OrientDB server. + */ + @Override + public void tearDownClass() throws Exception { + server.shutdown(); + log.info("OrientDB Embedded Server terminated successfully."); + } + + @Override + public <K, T extends Persistent> DataStore<K, T> + createDataStore(Class<K> keyClass, Class<T> persistentClass) throws GoraException { + OrientDBStore store = (OrientDBStore) super.createDataStore(keyClass, persistentClass); + return store; + } + +} http://git-wip-us.apache.org/repos/asf/gora/blob/71a95b98/gora-orientdb/src/test/java/org/apache/gora/orientdb/mapreduce/OrientDBStoreMapReduceTest.java ---------------------------------------------------------------------- diff --git a/gora-orientdb/src/test/java/org/apache/gora/orientdb/mapreduce/OrientDBStoreMapReduceTest.java b/gora-orientdb/src/test/java/org/apache/gora/orientdb/mapreduce/OrientDBStoreMapReduceTest.java new file mode 100644 index 0000000..7819d38 --- /dev/null +++ b/gora-orientdb/src/test/java/org/apache/gora/orientdb/mapreduce/OrientDBStoreMapReduceTest.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.orientdb.mapreduce; + +import org.apache.gora.mapreduce.DataStoreMapReduceTestBase; +import org.apache.gora.examples.generated.WebPage; +import org.apache.gora.orientdb.GoraOrientDBTestDriver; +import org.apache.gora.store.DataStore; +import org.apache.gora.store.DataStoreFactory; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; + +/** + * Executes tests for MR jobs over OrientDB dataStore. + */ +public class OrientDBStoreMapReduceTest extends DataStoreMapReduceTestBase { + + private GoraOrientDBTestDriver driver; + + public OrientDBStoreMapReduceTest() throws IOException { + super(); + driver = new GoraOrientDBTestDriver(); + } + + @Override + @Before + public void setUp() throws Exception { + driver.setUpClass(); + super.setUp(); + } + + @Override + @After + public void tearDown() throws Exception { + super.tearDown(); + driver.tearDownClass(); + } + + @Override + protected DataStore<String, WebPage> createWebPageDataStore() throws IOException { + try { + return DataStoreFactory.getDataStore(String.class, WebPage.class, driver.getConfiguration()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/71a95b98/gora-orientdb/src/test/java/org/apache/gora/orientdb/mapreduce/package-info.java ---------------------------------------------------------------------- diff --git a/gora-orientdb/src/test/java/org/apache/gora/orientdb/mapreduce/package-info.java b/gora-orientdb/src/test/java/org/apache/gora/orientdb/mapreduce/package-info.java new file mode 100644 index 0000000..71c5962 --- /dev/null +++ b/gora-orientdb/src/test/java/org/apache/gora/orientdb/mapreduce/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains tests for MR jobs over OrientDB dataStore. + */ +package org.apache.gora.orientdb.mapreduce; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/71a95b98/gora-orientdb/src/test/java/org/apache/gora/orientdb/package-info.java ---------------------------------------------------------------------- diff --git a/gora-orientdb/src/test/java/org/apache/gora/orientdb/package-info.java b/gora-orientdb/src/test/java/org/apache/gora/orientdb/package-info.java new file mode 100644 index 0000000..f7ac49d --- /dev/null +++ b/gora-orientdb/src/test/java/org/apache/gora/orientdb/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains all the dataStore specific tests for OrientDB dataStore. + * + */ +package org.apache.gora.orientdb; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/71a95b98/gora-orientdb/src/test/java/org/apache/gora/orientdb/store/OrientDBGoraDataStoreTest.java ---------------------------------------------------------------------- diff --git a/gora-orientdb/src/test/java/org/apache/gora/orientdb/store/OrientDBGoraDataStoreTest.java b/gora-orientdb/src/test/java/org/apache/gora/orientdb/store/OrientDBGoraDataStoreTest.java new file mode 100644 index 0000000..fd3d15b --- /dev/null +++ b/gora-orientdb/src/test/java/org/apache/gora/orientdb/store/OrientDBGoraDataStoreTest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.orientdb.store; + +import org.apache.gora.orientdb.GoraOrientDBTestDriver; +import org.apache.gora.store.DataStoreTestBase; +import org.junit.BeforeClass; +import org.junit.AfterClass; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Executes all the dataStore specific integration tests for OrientDB dataStore. + */ +public class OrientDBGoraDataStoreTest extends DataStoreTestBase { + + private static final Logger log = LoggerFactory.getLogger(OrientDBGoraDataStoreTest.class); + + @BeforeClass + public static void setUpClass() throws Exception { + setTestDriver(new GoraOrientDBTestDriver()); + DataStoreTestBase.setUpClass(); + } + + @AfterClass + public static void tearDownClass() throws Exception { + DataStoreTestBase.tearDownClass(); + } + + @Before + public void setUp() throws Exception { + super.setUp(); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + } + + @Ignore("3 types union field is not supported by OrientDBStore.") + @Override + public void testGet3UnionField() throws Exception { + //3 types union field is not supported by OrientDBStore. + } + +}