This is an automated email from the ASF dual-hosted git repository.
djkevincr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gora.git
The following commit(s) were added to refs/heads/master by this push:
new 504833f GORA-320 Hive backend support in Gora (#187)
504833f is described below
commit 504833f99f9bffd3b92bd46a48daebdabf1ccd57
Author: Chanaka Balasooriya <[email protected]>
AuthorDate: Tue Aug 27 21:39:55 2019 +0530
GORA-320 Hive backend support in Gora (#187)
* GORA 320 Hive backend support for Gora
* Add type safety issues
* Configure hive embedded server for testing
* Support delete queries in gora-hive
* Moving hive dependencies to parent pom
* Add hive test server
* Resolve hive dependency issues
* Remove gora-hive test profile
---
.../org/apache/gora/store/DataStoreTestUtil.java | 10 +-
gora-hive/pom.xml | 217 +++++++++
.../java/org/apache/gora/hive/package-info.java | 21 +
.../java/org/apache/gora/hive/query/HiveQuery.java | 40 ++
.../org/apache/gora/hive/query/HiveResult.java | 79 ++++
.../org/apache/gora/hive/query/package-info.java | 21 +
.../apache/gora/hive/store/HiveDataContext.java | 256 ++++++++++
.../org/apache/gora/hive/store/HiveMapping.java | 95 ++++
.../apache/gora/hive/store/HiveMappingBuilder.java | 157 +++++++
.../java/org/apache/gora/hive/store/HiveStore.java | 374 +++++++++++++++
.../gora/hive/store/HiveStoreParameters.java | 123 +++++
.../org/apache/gora/hive/store/package-info.java | 21 +
.../apache/gora/hive/util/HiveQueryBuilder.java | 521 +++++++++++++++++++++
.../apache/gora/hive/util/HiveResultParser.java | 243 ++++++++++
.../org/apache/gora/hive/util/package-info.java | 22 +
.../org/apache/gora/hive/GoraHiveTestDriver.java | 49 ++
.../java/org/apache/gora/hive/package-info.java | 21 +
.../org/apache/gora/hive/store/TestHiveStore.java | 171 +++++++
.../org/apache/gora/hive/store/package-info.java | 21 +
.../org/apache/gora/hive/util/HiveTestServer.java | 109 +++++
.../org/apache/gora/hive/util/package-info.java | 21 +
gora-hive/src/test/resources/gora-hive-mapping.xml | 45 ++
gora-hive/src/test/resources/gora.properties | 28 ++
gora-hive/src/test/resources/hive-site.xml | 85 ++++
pom.xml | 75 ++-
25 files changed, 2817 insertions(+), 8 deletions(-)
diff --git
a/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java
b/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java
index bb935ba..a452999 100644
--- a/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java
+++ b/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java
@@ -105,7 +105,7 @@ public class DataStoreTestUtil {
return employee;
}
- private static <K> WebPage createWebPage() {
+ public static <K> WebPage createWebPage() {
WebPage webpage = WebPage.newBuilder().build();
webpage.setUrl(new Utf8("url.."));
webpage.setContent(ByteBuffer.wrap("test
content".getBytes(Charset.defaultCharset())));
@@ -338,7 +338,7 @@ public class DataStoreTestUtil {
* @param employee
* @param after
*/
- private static void assertEqualEmployeeObjects(Employee employee, Employee
after) {
+ public static void assertEqualEmployeeObjects(Employee employee, Employee
after) {
//for (int i = 1; i < employee.SCHEMA$.getFields().size(); i++) {
// for (int j = 1; j < after.SCHEMA$.getFields().size(); j++) {
// assertEquals(employee.SCHEMA$.getFields().get(i),
after.SCHEMA$.getFields().get(j));
@@ -388,7 +388,7 @@ public class DataStoreTestUtil {
* @param beforeWebPage
* @param afterWebPage
*/
- private static void assertEqualWebPageObjects(WebPage beforeWebPage, WebPage
afterWebPage) {
+ public static void assertEqualWebPageObjects(WebPage beforeWebPage, WebPage
afterWebPage) {
//check url field
CharSequence beforeUrl = beforeWebPage.getUrl();
CharSequence afterUrl = afterWebPage.getUrl();
@@ -424,7 +424,7 @@ public class DataStoreTestUtil {
* @param beforeMetadata
* @param afterMetadata
*/
- private static void assertEqualMetadataObjects(Metadata beforeMetadata,
Metadata afterMetadata) {
+ public static void assertEqualMetadataObjects(Metadata beforeMetadata,
Metadata afterMetadata) {
//check version field
int beforeVersion = beforeMetadata.getVersion();
int afterVersion = afterMetadata.getVersion();
@@ -767,7 +767,7 @@ public class DataStoreTestUtil {
" actual=" + CONTENTS[i] + " i=" + i
, Arrays.equals( toByteArray(page.getContent() )
, CONTENTS[i].getBytes(Charset.defaultCharset())));
-
+
List<CharSequence> parsedContent = page.getParsedContent();
assertNotNull(parsedContent);
assertTrue(parsedContent.size() > 0);
diff --git a/gora-hive/pom.xml b/gora-hive/pom.xml
new file mode 100644
index 0000000..e1ae565
--- /dev/null
+++ b/gora-hive/pom.xml
@@ -0,0 +1,217 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.gora</groupId>
+ <artifactId>gora</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <relativePath>../</relativePath>
+ </parent>
+ <artifactId>gora-hive</artifactId>
+ <packaging>bundle</packaging>
+
+ <name>Apache Gora :: Hive</name>
+ <url>http://gora.apache.org</url>
+ <description>The Apache Gora open source framework provides an in-memory
data model and
+ persistence for big data. Gora supports persisting to column stores, key
value stores,
+ document stores and RDBMSs, and analyzing the data with extensive Apache
Hadoop MapReduce
+ support.
+ </description>
+ <inceptionYear>2010</inceptionYear>
+ <organization>
+ <name>The Apache Software Foundation</name>
+ <url>http://www.apache.org/</url>
+ </organization>
+ <issueManagement>
+ <system>JIRA</system>
+ <url>https://issues.apache.org/jira/browse/GORA</url>
+ </issueManagement>
+ <ciManagement>
+ <system>Jenkins</system>
+ <url>https://builds.apache.org/job/Gora-trunk/</url>
+ </ciManagement>
+
+ <properties>
+ <osgi.import>*</osgi.import>
+
<osgi.export>org.apache.gora.hive*;version="${project.version}";-noimport:=true</osgi.export>
+ </properties>
+
+ <build>
+ <directory>target</directory>
+ <outputDirectory>target/classes</outputDirectory>
+ <finalName>${project.artifactId}-${project.version}</finalName>
+ <testOutputDirectory>target/test-classes</testOutputDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <testResources>
+ <testResource>
+ <directory>${project.basedir}/src/test/resources</directory>
+ <includes>
+ <include>**/*</include>
+ </includes>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>${build-helper-maven-plugin.version}</version>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/examples/java</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <!-- Gora Internal Dependencies -->
+ <dependency>
+ <groupId>org.apache.gora</groupId>
+ <artifactId>gora-core</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.gora</groupId>
+ <artifactId>gora-core</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.jdom</groupId>
+ <artifactId>jdom</artifactId>
+ </dependency>
+
+ <!-- Logging Dependencies -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.jms</groupId>
+ <artifactId>jms</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- Testing Dependencies -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ </dependency>
+
+ <!-- 2.6 version of hadoop-common is defined only for gora-hive
+ as it creates some backward compatibility issues by defining in parent-->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop-common.version}</version>
+ </dependency>
+
+ <!-- Hive Dependencies -->
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-jdbc</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-service</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.metamodel</groupId>
+ <artifactId>MetaModel-jdbc</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.metamodel</groupId>
+ <artifactId>MetaModel-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.json</groupId>
+ <artifactId>json</artifactId>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/gora-hive/src/main/java/org/apache/gora/hive/package-info.java
b/gora-hive/src/main/java/org/apache/gora/hive/package-info.java
new file mode 100644
index 0000000..654732e
--- /dev/null
+++ b/gora-hive/src/main/java/org/apache/gora/hive/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains Hive store related all classes
+ */
+package org.apache.gora.hive;
\ No newline at end of file
diff --git a/gora-hive/src/main/java/org/apache/gora/hive/query/HiveQuery.java
b/gora-hive/src/main/java/org/apache/gora/hive/query/HiveQuery.java
new file mode 100644
index 0000000..7f587ea
--- /dev/null
+++ b/gora-hive/src/main/java/org/apache/gora/hive/query/HiveQuery.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.hive.query;
+
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.query.impl.QueryBase;
+import org.apache.gora.store.DataStore;
+
+/**
+ * HiveDataStore specific implementation of the {@link
org.apache.gora.query.Query} interface.
+ * @param <K> Key Class Type
+ * @param <T> Persistence Class Type
+ *
+ */
+public class HiveQuery<K, T extends PersistentBase> extends QueryBase<K, T> {
+
+ public HiveQuery() {
+ super(null);
+ }
+
+ public HiveQuery(DataStore<K, T> dataStore) {
+ super(dataStore);
+ }
+}
diff --git a/gora-hive/src/main/java/org/apache/gora/hive/query/HiveResult.java
b/gora-hive/src/main/java/org/apache/gora/hive/query/HiveResult.java
new file mode 100644
index 0000000..203d498
--- /dev/null
+++ b/gora-hive/src/main/java/org/apache/gora/hive/query/HiveResult.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.hive.query;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.gora.hive.store.HiveStore;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.impl.ResultBase;
+import org.apache.metamodel.data.DataSet;
+import org.apache.metamodel.data.Row;
+
+/**
+ * Hive Query Result implementation of the the {@link
org.apache.gora.query.Result} interface.
+ */
+public class HiveResult<K, T extends PersistentBase> extends ResultBase<K, T> {
+
+ private List<Row> results;
+ private int currentRow= 0 ;
+
+ public HiveResult(HiveStore<K, T> dataStore, Query<K, T> query) {
+ super(dataStore, query);
+ }
+
+ public HiveResult(HiveStore<K, T> dataStore, Query<K, T> query, DataSet
dataSet) {
+ super(dataStore, query);
+ results = dataSet.toRows();
+ currentRow = 0;
+ }
+
+ @Override
+ public HiveStore<K, T> getDataStore() {
+ return (HiveStore<K, T>) super.getDataStore();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return currentRow/(size()+1.0f);
+ }
+
+ @Override
+ public int size() {
+ return (results == null) ? 0 : results.size();
+ }
+
+ @Override
+ protected boolean nextInner() throws IOException {
+ try {
+ if (results == null || currentRow == results.size()){
+ return false;
+ }
+ HiveStore<K, T> hiveStore = ((HiveStore<K, T>) dataStore);
+ Row nextRow = results.get(currentRow);
+ key = hiveStore.readKey(nextRow);
+ persistent = hiveStore.readObject(nextRow);
+ currentRow++;
+ return true;
+ } catch (Exception ex) {
+ throw new IOException(ex);
+ }
+ }
+}
diff --git
a/gora-hive/src/main/java/org/apache/gora/hive/query/package-info.java
b/gora-hive/src/main/java/org/apache/gora/hive/query/package-info.java
new file mode 100644
index 0000000..fa1e33c
--- /dev/null
+++ b/gora-hive/src/main/java/org/apache/gora/hive/query/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains all Hive query and result representing classes
+ */
+package org.apache.gora.hive.query;
\ No newline at end of file
diff --git
a/gora-hive/src/main/java/org/apache/gora/hive/store/HiveDataContext.java
b/gora-hive/src/main/java/org/apache/gora/hive/store/HiveDataContext.java
new file mode 100644
index 0000000..256d1af
--- /dev/null
+++ b/gora-hive/src/main/java/org/apache/gora/hive/store/HiveDataContext.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.hive.store;
+
+
+import java.lang.invoke.MethodHandles;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.gora.util.GoraException;
+import org.apache.metamodel.DataContext;
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.UpdateScript;
+import org.apache.metamodel.UpdateSummary;
+import org.apache.metamodel.UpdateableDataContext;
+import org.apache.metamodel.data.DataSet;
+import org.apache.metamodel.factory.DataContextPropertiesImpl;
+import org.apache.metamodel.jdbc.JdbcDataContext;
+import org.apache.metamodel.jdbc.JdbcDataContextFactory;
+import org.apache.metamodel.query.CompiledQuery;
+import org.apache.metamodel.query.Query;
+import org.apache.metamodel.query.builder.InitFromBuilder;
+import org.apache.metamodel.schema.Column;
+import org.apache.metamodel.schema.Schema;
+import org.apache.metamodel.schema.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Thread safe implementation to query on hive data context This implements
all methods of
+ * {@link org.apache.metamodel.DataContext} and {@link
org.apache.metamodel.UpdateableDataContext}
+ * and methods to establish a HiveConnection and execute row hive sql strings
on that connection
+ */
+public class HiveDataContext implements DataContext, UpdateableDataContext {
+
+ private static final Logger LOG =
LoggerFactory.getLogger((MethodHandles.lookup().lookupClass()));
+
+ private final ThreadLocal<JdbcDataContext> dataContext;
+ private final DataContextPropertiesImpl dataContextProperties;
+ private final JdbcDataContextFactory jdbcDataContextFactory;
+ private final BlockingQueue<JdbcDataContext> dataContextPool = new
LinkedBlockingQueue<>();
+ // provide if the hive data context is closed and this is used to
synchronise creating
+ // and closing jdbc data contexts
+ private Boolean isClosed;
+
+ public HiveDataContext(HiveStoreParameters hiveStoreParameters) {
+ jdbcDataContextFactory = new JdbcDataContextFactory();
+ dataContextProperties = generateDataContextProperties(hiveStoreParameters);
+ this.dataContext = new ThreadLocal<>();
+ this.isClosed = false;
+ }
+
+ /**
+ * Return jdbc data context assigned to the current thread.
+ *
+ * @return {@link org.apache.metamodel.jdbc.JdbcDataContext} Jdbc data
context
+ */
+ public JdbcDataContext getDataContext() {
+ // If the data context is not found or the connection is closed,
+ // a new data context will be created assuming the current thread accesses
+ // the data context for the first time or the previous connection closed
due to some exceptions
+ // thrown during the last query
+
+ JdbcDataContext jdbcDataContext = dataContext.get();
+ boolean connectionClosed = false;
+ if (jdbcDataContext != null) {
+ try {
+ connectionClosed = jdbcDataContext.getConnection().isClosed();
+ } catch (SQLException e) {
+ LOG.error("Checking connection status failed", e);
+ }
+ }
+ synchronized (isClosed) {
+ if ((jdbcDataContext == null || connectionClosed) && !isClosed) {
+ jdbcDataContext = (JdbcDataContext) jdbcDataContextFactory
+ .create(dataContextProperties, null);
+ dataContext.set(jdbcDataContext);
+ dataContextPool.add(jdbcDataContext);
+ }
+ }
+ return jdbcDataContext;
+ }
+
+ @Override
+ public DataContext refreshSchemas() {
+ try {
+ return getDataContext().refreshSchemas();
+ } catch (MetaModelException e) {
+ LOG.error("Refreshing schema failed", e);
+ return null;
+ }
+ }
+
+ @Override
+ public List<Schema> getSchemas() {
+ return getDataContext().getSchemas();
+ }
+
+ @Override
+ public List<String> getSchemaNames() {
+ return getDataContext().getSchemaNames();
+ }
+
+ @Override
+ public Schema getDefaultSchema() {
+ return getDataContext().getDefaultSchema();
+ }
+
+ @Override
+ public Schema getSchemaByName(String s) {
+ return getDataContext().getSchemaByName(s);
+ }
+
+ @Override
+ public InitFromBuilder query() {
+ try {
+ return getDataContext().query();
+ } catch (MetaModelException e) {
+ LOG.error("Initiating a query failed", e);
+ return null;
+ }
+ }
+
+ @Override
+ public Query parseQuery(String s) {
+ return getDataContext().parseQuery(s);
+ }
+
+ @Override
+ public DataSet executeQuery(Query query) {
+ return getDataContext().executeQuery(query);
+ }
+
+ @Override
+ public CompiledQuery compileQuery(Query query) {
+ return getDataContext().compileQuery(query);
+ }
+
+ @Override
+ public DataSet executeQuery(CompiledQuery compiledQuery, Object... objects) {
+ return getDataContext().executeQuery(compiledQuery, objects);
+ }
+
+ @Override
+ public DataSet executeQuery(String s) {
+ return getDataContext().executeQuery(s);
+ }
+
+ @Override
+ public UpdateSummary executeUpdate(UpdateScript updateScript) {
+ return getDataContext().executeUpdate(updateScript);
+ }
+
+ @Override
+ public Column getColumnByQualifiedLabel(String s) {
+ return getDataContext().getColumnByQualifiedLabel(s);
+ }
+
+ @Override
+ public Table getTableByQualifiedLabel(String s) {
+ return getDataContext().getTableByQualifiedLabel(s);
+ }
+
+ /**
+ * Close hive data context and clear all its subsequent jdbc data contexts
+ */
+ public void close() {
+ synchronized (isClosed) {
+ if (!isClosed) {
+ for (JdbcDataContext jdbcDataContext : dataContextPool) {
+ Connection connection = jdbcDataContext.getConnection();
+ jdbcDataContext.close(connection);
+ }
+ dataContextPool.clear();
+ isClosed = true;
+ }
+ }
+ }
+
+ /**
+ * Execute hive query sql
+ *
+ * @param hiveQuery query to be executed
+ * @throws GoraException throw if a SQLException is thrown
+ */
+ public void executeHiveQL(String hiveQuery) throws GoraException {
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Executing the query : {}", hiveQuery);
+ }
+ Connection connection = getDataContext().getConnection();
+ Statement statement = connection.createStatement();
+ statement.execute(hiveQuery);
+ statement.close();
+ } catch (SQLException e) {
+ throw new GoraException(e);
+ }
+ }
+
+ /**
+ * Create a prepared statement using the given hive sql query
+ *
+ * @param hiveQuery query to be executed
+ * @return {@link org.apache.hive.jdbc.HivePreparedStatement} hive prepared
statement
+ * @throws GoraException throw if a SQLException is thrown
+ */
+ public PreparedStatement getPreparedStatement(String hiveQuery) throws
GoraException {
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Initiating prepared statement for the query : {}",
hiveQuery);
+ }
+ Connection connection = getDataContext().getConnection();
+ return connection.prepareStatement(hiveQuery);
+ } catch (SQLException e) {
+ throw new GoraException(e);
+ }
+ }
+
+ /**
+ * Generate DataContextPropertiesImpl using basic properties to establish a
connection to Hive
+ * backend service
+ *
+ * @param hiveStoreParameters hive store parameters including at least
server url
+ * @return DataContextPropertiesImpl connection properties
+ */
+ private DataContextPropertiesImpl generateDataContextProperties(
+ HiveStoreParameters hiveStoreParameters) {
+ final DataContextPropertiesImpl properties = new
DataContextPropertiesImpl();
+ properties.put(DataContextPropertiesImpl.PROPERTY_DATA_CONTEXT_TYPE,
+ HiveStoreParameters.HIVE_DATA_CONTEXT_TYPE);
+ properties.put(DataContextPropertiesImpl.PROPERTY_URL,
hiveStoreParameters.getServerUrl());
+ properties
+ .put(DataContextPropertiesImpl.PROPERTY_DRIVER_CLASS,
hiveStoreParameters.getDriverName());
+ return properties;
+ }
+}
\ No newline at end of file
diff --git
a/gora-hive/src/main/java/org/apache/gora/hive/store/HiveMapping.java
b/gora-hive/src/main/java/org/apache/gora/hive/store/HiveMapping.java
new file mode 100644
index 0000000..9def8a7
--- /dev/null
+++ b/gora-hive/src/main/java/org/apache/gora/hive/store/HiveMapping.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.hive.store;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Mapping definitions for Hive Store
+ */
+public class HiveMapping {
+
+ public static final String DEFAULT_KEY_NAME = "primary_key";
+
+ /**
+ * Name of the schema table to be used
+ */
+ private String tableName;
+
+ /**
+ * List of field names in the schema
+ */
+ private List<String> fields = new ArrayList<>();
+
+ /**
+ * Map of column names mapping to field names
+ */
+ private Map<String, String> columnFieldMap = new HashMap<>();
+
+ /**
+ * Getter for table name of the schema
+ * @return table name as a String
+ */
+ public String getTableName() {
+ return tableName;
+ }
+
+ /**
+ * Setter for the table name of the schema
+ * @param tableName table name as a string
+ */
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ /**
+ * Getter for the list of field names in the schema
+ * @return field names as a list of strings.
+ */
+ public List<String> getFields() {
+ return fields;
+ }
+
+ /**
+ * Setter for the list of field names in the schema
+ * @param fields field names as a list of strings.
+ */
+ public void setFields(List<String> fields) {
+ this.fields = fields;
+ }
+
+ /**
+ * Getter for columnFieldMap
+ * @return map of column names to field names
+ */
+ public Map<String, String> getColumnFieldMap() {
+ return columnFieldMap;
+ }
+
+ /**
+ * Setter for columnFieldMap
+ * @param columnFieldMap map of column names to field names map
+ */
+ public void setColumnFieldMap(Map<String, String> columnFieldMap) {
+ this.columnFieldMap = columnFieldMap;
+ }
+}
diff --git
a/gora-hive/src/main/java/org/apache/gora/hive/store/HiveMappingBuilder.java
b/gora-hive/src/main/java/org/apache/gora/hive/store/HiveMappingBuilder.java
new file mode 100644
index 0000000..fe0d931
--- /dev/null
+++ b/gora-hive/src/main/java/org/apache/gora/hive/store/HiveMappingBuilder.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.hive.store;
+
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.gora.util.GoraException;
+import org.jdom.Element;
+import org.jdom.JDOMException;
+import org.jdom.input.SAXBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is for reading a given mapping file and building the hive
mappings
+ *
+ * @param <K> Key class
+ * @param <T> Persistent class
+ */
+public class HiveMappingBuilder<K, T> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger((MethodHandles.lookup().lookupClass()));
+
+ //Tag names
+ private static final String CLASS_TAG = "class";
+ private static final String FIELD_TAG = "field";
+
+ //Attribute names
+ private static final String KEYCLASS_ATTRIBUTE = "keyClass";
+ private static final String TABLE_ATTRIBUTE = "table";
+ private static final String NAME_ATTRIBUTE = "name";
+
+ private HiveStore<?, ?> dataStore;
+
+ public HiveMappingBuilder(HiveStore<K, ?> dataStore) {
+ this.dataStore = dataStore;
+ }
+
+ /**
+ * Reading the given file to build the Hive Mappings.
+ *
+ * @param filename path of the mapping file
+ * @return {@link org.apache.gora.hive.store.HiveMapping} hive mappings
+ * @throws GoraException exception in reading mappings
+ */
+ @SuppressWarnings("unchecked")
+ public HiveMapping readMappingFile(String filename) throws GoraException {
+ HiveMapping mapping = new HiveMapping();
+ final Class<T> persistentClass = (Class<T>) dataStore.getPersistentClass();
+ final Class<K> keyClass = (Class<K>) dataStore.getKeyClass();
+ final SAXBuilder saxBuilder = new SAXBuilder();
+ final InputStream is =
getClass().getClassLoader().getResourceAsStream(filename);
+ if (is == null) {
+ throw new GoraException("hive mapping file:" + filename + " could not be
loaded");
+ }
+ final Element root;
+ try {
+ root = saxBuilder.build(is).getRootElement();
+ } catch (JDOMException | IOException e) {
+ throw new GoraException("Reading hive mapping file : " + filename + "
failed", e);
+ }
+ List<Element> classElements = root.getChildren(CLASS_TAG);
+ if (classElements == null || classElements.isEmpty()) {
+ throw new GoraException(
+ "Could not find any class definition in the mapping file:" +
filename);
+ }
+ parseClassElements(mapping, persistentClass, keyClass, classElements);
+ if (!validateSchema(mapping)) {
+ throw new GoraException("Schema validation failed for the mapping file:
" + filename);
+ }
+ return mapping;
+ }
+
+ /**
+ * Validate hive schema mappings
+ *
+ * @param mapping Hive mappings
+ * @return true if a valid schema. otherwise false
+ */
+ private boolean validateSchema(HiveMapping mapping) {
+ List<String> fields = mapping.getFields();
+ if (fields == null || fields.isEmpty()) {
+ LOG.error("table should have at least single column");
+ return false;
+ } else {
+ for (String fieldName : fields) {
+ if (HiveMapping.DEFAULT_KEY_NAME.equals(fieldName)) {
+ LOG.error("\'{}\' is a reserved keyword and cannot be used as a
field name",
+ HiveMapping.DEFAULT_KEY_NAME);
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void parseClassElements(HiveMapping hiveMapping, Class<T>
persistentClass,
+ Class<K> keyClass, List<Element> classElements) {
+ for (Element classElement : classElements) {
+ String persistentClassName =
classElement.getAttributeValue(NAME_ATTRIBUTE);
+ String keyClassName = classElement.getAttributeValue(KEYCLASS_ATTRIBUTE);
+ //Find a class which matches persistent class name and key class name
+ if (persistentClassName != null && keyClassName != null &&
persistentClassName
+ .equals(persistentClass.getName()) && keyClassName
+ .equals(keyClass.getName())) {
+ hiveMapping.setTableName(dataStore
+ .getSchemaName(classElement.getAttributeValue(TABLE_ATTRIBUTE),
+ dataStore.getPersistentClass()));
+ List<Element> fieldElments = classElement.getChildren(FIELD_TAG);
+ parseFieldElements(hiveMapping, fieldElments);
+ break;
+ }
+ }
+ }
+
+ private void parseFieldElements(HiveMapping hiveMapping, List<Element>
fieldElments) {
+ if (fieldElments != null) {
+ List<String> fieldNames = new ArrayList<>();
+ Map<String, String> columnFieldMap = new HashMap<>();
+ for (Element field : fieldElments) {
+ String fieldName =
field.getAttributeValue(HiveMappingBuilder.NAME_ATTRIBUTE);
+ if (fieldName == null || fieldName.isEmpty()) {
+ LOG.warn("Field without a name attribute is found and that field
will be ignored");
+ } else {
+ fieldNames.add(fieldName);
+ columnFieldMap.put(fieldName.toLowerCase(Locale.getDefault()),
fieldName);
+ }
+ }
+ hiveMapping.setFields(fieldNames);
+ hiveMapping.setColumnFieldMap(columnFieldMap);
+ }
+ }
+}
\ No newline at end of file
diff --git a/gora-hive/src/main/java/org/apache/gora/hive/store/HiveStore.java
b/gora-hive/src/main/java/org/apache/gora/hive/store/HiveStore.java
new file mode 100644
index 0000000..d04574e
--- /dev/null
+++ b/gora-hive/src/main/java/org/apache/gora/hive/store/HiveStore.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.hive.store;
+
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import org.apache.avro.Schema.Field;
+import org.apache.gora.hive.query.HiveQuery;
+import org.apache.gora.hive.query.HiveResult;
+import org.apache.gora.hive.util.HiveQueryBuilder;
+import org.apache.gora.hive.util.HiveResultParser;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.query.PartitionQuery;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.query.impl.PartitionQueryImpl;
+import org.apache.gora.store.impl.DataStoreBase;
+import org.apache.gora.util.GoraException;
+import org.apache.metamodel.data.DataSet;
+import org.apache.metamodel.data.Row;
+import org.apache.metamodel.delete.DeleteFrom;
+import org.apache.metamodel.drop.DropTable;
+import org.apache.metamodel.query.SelectItem;
+import org.apache.metamodel.query.builder.SatisfiedSelectBuilder;
+import org.apache.metamodel.query.builder.SatisfiedWhereBuilder;
+import org.apache.metamodel.schema.Column;
+import org.apache.metamodel.schema.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of a Hive data store to be used by gora.
+ *
+ * @param <K> class to be used for the key
+ * @param <T> class to be persisted within the store
+ */
+public class HiveStore<K, T extends PersistentBase> extends DataStoreBase<K,
T> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HiveStore.class);
+
+ private static final String PARSE_MAPPING_FILE_KEY =
"gora.hive.mapping.file";
+
+ private static final String DEFAULT_MAPPING_FILE = "gora-hive-mapping.xml";
+
+
+ private volatile HiveDataContext dataContext;
+ private HiveStoreParameters hiveStoreParameters;
+ private HiveMapping mapping;
+ private Table schemaTable;
+ private HiveQueryBuilder queryBuilder;
+ private HiveResultParser resultParser;
+
+ @Override
+ public void initialize(Class<K> keyClass, Class<T> persistentClass,
Properties properties)
+ throws GoraException {
+ LOG.debug("Initializing Hive store");
+ super.initialize(keyClass, persistentClass, properties);
+ hiveStoreParameters = HiveStoreParameters.load(properties);
+ mapping = new HiveMappingBuilder<Object, Object>((HiveStore<Object, ?>)
this)
+ .readMappingFile(getConf().get(PARSE_MAPPING_FILE_KEY,
DEFAULT_MAPPING_FILE));
+ try {
+ dataContext = new HiveDataContext(hiveStoreParameters);
+ } catch (Exception e) {
+ LOG.error("Data context creation failed", e);
+ throw new GoraException(e);
+ }
+ queryBuilder = new HiveQueryBuilder(this, mapping);
+ resultParser = new HiveResultParser(this);
+ }
+
+ @Override
+ public String getSchemaName() {
+ return getSchemaName(mapping.getTableName(), persistentClass);
+ }
+
+ @Override
+ protected String getSchemaName(String mappingSchemaName, Class<?>
persistentClass) {
+ return super.getSchemaName(mappingSchemaName, persistentClass);
+ }
+
+ @Override
+ public void createSchema() throws GoraException {
+ if (!schemaExists()) {
+ LOG.info("Creating hive schema {}", getSchemaName());
+ String hiveQuery =
queryBuilder.buildCreateQuery(hiveStoreParameters.getDatabaseName(),
+ fieldMap);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Hive schema creation query : {}", hiveQuery);
+ }
+ dataContext.executeHiveQL(hiveQuery);
+ dataContext.refreshSchemas();
+ }
+ }
+
+ @Override
+ public void deleteSchema() throws GoraException {
+ if (schemaExists()) {
+ LOG.info("Deleting hive schema {}", getSchemaName());
+ DropTable dropTable = new DropTable(getSchemaTable());
+ dataContext.executeUpdate(dropTable);
+ dataContext.refreshSchemas();
+ schemaTable = null;
+ }
+ }
+
+ @Override
+ public boolean schemaExists() throws GoraException {
+ Table table = getSchemaTable();
+ return (table != null);
+ }
+
+ @Override
+ public boolean exists(K key) throws GoraException {
+ DataSet dataSet = executeGetQuery(key, new
String[]{HiveMapping.DEFAULT_KEY_NAME});
+ return (dataSet != null && dataSet.next());
+ }
+
+ @Override
+ public T get(K key, String[] fields) throws GoraException {
+ if (fields == null || fields.length == 0) {
+ fields = getFields();
+ }
+ DataSet dataSet = executeGetQuery(key, fields);
+ return newInstance(dataSet);
+ }
+
+ /**
+ * Put an persistent object into the hive store Though we use a key value,
currently hive does not
+ * validate integrity constraints and Hive server may not support update
queries on a particular
+ * key
+ *
+ * @param key the key of the object.
+ * @param obj the Persistent object.
+ * @throws GoraException throws if putting object is failed
+ */
+ @Override
+ public void put(K key, T obj) throws GoraException {
+ try {
+ List<Object> parementerList = new ArrayList<>();
+ String sql = queryBuilder.buildInsertQuery(key, obj, fieldMap,
parementerList);
+ PreparedStatement statement = dataContext.getPreparedStatement(sql);
+ for (int i = 0; i < parementerList.size(); i++) {
+ statement.setObject(i + 1, parementerList.get(i));
+ }
+ statement.execute();
+ } catch (IOException | SQLException e) {
+ throw new GoraException(e);
+ }
+ }
+
+ /**
+ * This deletes a record from the hive store for the supplied key. But,
delete queries do not
+ * support in all hive servers and they require some specific server
configurations.
+ *
+ * @param key the key of the deleting entry.
+ */
+ @Override
+ public boolean delete(K key) throws GoraException {
+ Table table = getSchemaTable();
+ DeleteFrom delete = new
DeleteFrom(table).where(HiveMapping.DEFAULT_KEY_NAME).eq(key);
+ dataContext.executeUpdate(delete);
+ return true;
+ }
+
+ /**
+ * This deletes all the matching records from hive store. But, delete
queries do not support in
+ * all hive servers and they require some specific server configurations.
+ *
+ * @param query matching records to this query will be deleted.
+ * @return number of deleted entires. -1 if all entries were deleted
+ */
+ @Override
+ public long deleteByQuery(Query<K, T> query) throws GoraException {
+ if (query.getKey() == null && query.getStartKey() == null &&
query.getEndKey() == null) {
+ deleteSchema();
+ createSchema();
+ return -1;
+ } else {
+ try {
+ int deleteCount = 0;
+ Result<K, T> result = query.execute();
+ while (result.next()) {
+ if (this.delete(result.getKey())) {
+ deleteCount++;
+ }
+ }
+ return deleteCount;
+ } catch (Exception e) {
+ throw new GoraException(e);
+ }
+ }
+ }
+
+ @Override
+ public Result<K, T> execute(Query<K, T> query) throws GoraException {
+ String[] fields = query.getFields();
+ SatisfiedSelectBuilder<?> builder;
+ if (fields == null || fields.length == 0) {
+ builder = dataContext.query().from(getSchemaTable()).selectAll();
+ } else {
+ int fieldLength = fields.length;
+ fields = Arrays.copyOf(fields, fieldLength + 1);
+ fields[fieldLength] = HiveMapping.DEFAULT_KEY_NAME;
+ builder = dataContext.query().from(getSchemaTable()).select(fields);
+ }
+ K startKey = query.getStartKey();
+ K endKey = query.getEndKey();
+ if (startKey != null && startKey.equals(endKey)) {
+ builder.where(HiveMapping.DEFAULT_KEY_NAME).eq(startKey);
+ } else {
+ if (startKey != null) {
+
builder.where(HiveMapping.DEFAULT_KEY_NAME).greaterThanOrEquals(startKey);
+ }
+
+ if (endKey != null) {
+ builder.where(HiveMapping.DEFAULT_KEY_NAME).lessThanOrEquals(endKey);
+ }
+ }
+ if (query.getLimit() > 0) {
+ builder.limit(((Long) query.getLimit()).intValue());
+ }
+ return new HiveResult<>(this, query,
dataContext.executeQuery(builder.toQuery()));
+ }
+
+ @Override
+ public Query<K, T> newQuery() {
+ return new HiveQuery<>(this);
+ }
+
+ @Override
+ public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) {
+ List<PartitionQuery<K, T>> partitions = new ArrayList<>();
+ PartitionQueryImpl<K, T> partitionQuery = new PartitionQueryImpl<>(
+ query);
+ partitionQuery.setConf(getConf());
+ partitions.add(partitionQuery);
+ return partitions;
+ }
+
+ @Override
+ public void flush() {
+ dataContext.refreshSchemas();
+ }
+
+ @Override
+ public void close() {
+ dataContext.close();
+ }
+
+ @Override
+ protected String[] getFields() {
+ List<String> fields = mapping.getFields();
+ return fields.toArray(new String[0]);
+ }
+
+ /**
+ * Creates a new Persistent instance with the values in 'dataSet' for the
fields selected in the
+ * query
+ *
+ * @param dataSet result data set from a hive query
+ * @return a persistence class instance which content was deserialized
+ */
+ private T newInstance(DataSet dataSet) throws GoraException {
+ if (dataSet == null) {
+ return null;
+ }
+ dataSet.next();
+ return readObject(dataSet.getRow());
+ }
+
+ /**
+ * Return database schemaTable for a given schema name
+ *
+ * @return org.apache.metamodel.schema.Table schemaTable
+ */
+ private Table getSchemaTable() throws GoraException {
+ if (schemaTable != null &&
schemaTable.getName().equalsIgnoreCase(getSchemaName())) {
+ return schemaTable;
+ }
+ org.apache.metamodel.schema.Schema schema = dataContext
+ .getSchemaByName(hiveStoreParameters.getDatabaseName());
+ if (schema == null) {
+ throw new GoraException(
+ "Could not find database for name : " +
hiveStoreParameters.getDatabaseName());
+ }
+ schemaTable = schema.getTableByName(getSchemaName());
+ return schemaTable;
+ }
+
+ /**
+ * Read a resulted row object and parse it to a persistent object
+ *
+ * @param row {@link org.apache.metamodel.data.Row} Resulted row object
+ * @return Persistent object
+ * @throws GoraException throws if reading of data is failed
+ */
+ public T readObject(Row row) throws GoraException {
+ if (row == null || row.size() == 0) {
+ LOG.error("Data set is empty");
+ return null;
+ }
+ T persistent = newPersistent();
+ List<SelectItem> selectItems = row.getSelectItems();
+ for (SelectItem selectItem : selectItems) {
+ Column column = selectItem.getColumn();
+ if (HiveMapping.DEFAULT_KEY_NAME.equalsIgnoreCase(column.getName())) {
+ continue;
+ }
+ Field field =
fieldMap.get(mapping.getColumnFieldMap().get(column.getName()));
+ Object value = row.getValue(column);
+ if (value != null) {
+ persistent.put(field.name(), resultParser.parseSchema(field.schema(),
value));
+ persistent.isDirty(field.name());
+ }
+ }
+ persistent.clearDirty();
+ return persistent;
+ }
+
+ /**
+ * Read the key value from a given data row
+ *
+ * @param row Resulted data row
+ * @return value of the key field
+ * @throws GoraException throws if reading of the key value is failed
+ */
+ @SuppressWarnings("unchecked")
+ public K readKey(Row row) throws GoraException {
+ if (row == null || row.size() == 0) {
+ LOG.error("Data set is empty");
+ return null;
+ }
+ Column keyColumn =
getSchemaTable().getColumnByName(HiveMapping.DEFAULT_KEY_NAME);
+ return (K) row.getValue(keyColumn);
+ }
+
+ /**
+ * Execute a select query based on key value
+ *
+ * @param key key value
+ * @param fields filed names to be selected
+ * @return Resulted data set
+ * @throws GoraException throws if selection query is failed
+ */
+ private DataSet executeGetQuery(K key, String[] fields) throws GoraException
{
+ Table table = getSchemaTable();
+ SatisfiedWhereBuilder<?> query =
dataContext.query().from(table).select(fields)
+ .where(HiveMapping.DEFAULT_KEY_NAME).eq(key);
+ String sql = query.toQuery().toSql();
+ return dataContext.executeQuery(sql);
+ }
+}
+
diff --git
a/gora-hive/src/main/java/org/apache/gora/hive/store/HiveStoreParameters.java
b/gora-hive/src/main/java/org/apache/gora/hive/store/HiveStoreParameters.java
new file mode 100644
index 0000000..710017d
--- /dev/null
+++
b/gora-hive/src/main/java/org/apache/gora/hive/store/HiveStoreParameters.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.hive.store;
+
+import java.util.Properties;
+
+/**
+ * This class is to hold parameters which need to initiate a connection to
hive backend service
+ */
+public class HiveStoreParameters {
+ /**
+ * Hive server url can be, jdbc:hive2://<host>:<port>/<db>;initFile=<file>,
+ * jdbc:hive2:///;initFile=<file>,
jdbc:hive2://<host>:<port>/<db>;transportMode=http;httpPath=<http_endpoint>,
+ *
jdbc:hive2://<host>:<port>/<db>;ssl=true;sslTrustStore=<trust_store_path>;trustStorePassword=<trust_store_password>,
+ * jdbc:hive2://<zookeeper
quorum>/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2
+ */
+ private static final String HIVE_SERVER_URL_PROPERTY =
"gora.hive.server.url";
+
+ private static final String HIVE_DATABASE_NAME_PROPERTY =
"gora.hive.database.name";
+ private static final String HIVE_DRIVER_NAME_PROPERTY =
"gora.hive.driver.name";
+
+ //hive default values
+ private static final String HIVE_DEFAULT_DATABASE_NAME = "default";
+ public static final String HIVE_DEFAULT_DRIVER_NAME =
"org.apache.hive.jdbc.HiveDriver";
+ public static final String HIVE_DATA_CONTEXT_TYPE = "jdbc";
+
+ private String databaseName;
+ private String serverUrl;
+ private String driverName;
+
+ /**
+ * This shouldn't be initiated directly
+ */
+ private HiveStoreParameters() {
+ }
+
+ /**
+ * Getter for hive server url
+ *
+ * @return server url
+ */
+ public String getServerUrl() {
+ return serverUrl;
+ }
+
+ /**
+ * Setter for hive server url
+ *
+ * @param serverUrl Sever url
+ */
+ public void setServerUrl(String serverUrl) {
+ this.serverUrl = serverUrl;
+ }
+
+ /**
+ * Getter for hive driver class name.
+ *
+ * @return driver class name
+ */
+ public String getDriverName() {
+ return driverName;
+ }
+
+ /**
+ * Setter for hive driver name
+ *
+ * @param driverName driver class name
+ */
+ public void setDriverName(String driverName) {
+ this.driverName = driverName;
+ }
+
+ /**
+ * Getter for hive database name
+ *
+ * @return database name
+ */
+ public String getDatabaseName() {
+ return databaseName;
+ }
+
+ /**
+ * Setter for hive database name
+ *
+ * @param databaseName database name
+ */
+ public void setDatabaseName(String databaseName) {
+ this.databaseName = databaseName;
+ }
+
+ /**
+ * Derive hive store parameters from the properties
+ *
+ * @param properties hive properties
+ * @return HiveStoreParameters object
+ */
+ public static HiveStoreParameters load(Properties properties) {
+ HiveStoreParameters storeParameters = new HiveStoreParameters();
+ storeParameters
+ .setDatabaseName(
+ properties.getProperty(HIVE_DATABASE_NAME_PROPERTY,
HIVE_DEFAULT_DATABASE_NAME));
+
storeParameters.setServerUrl(properties.getProperty(HIVE_SERVER_URL_PROPERTY));
+
storeParameters.setDriverName(properties.getProperty(HIVE_DRIVER_NAME_PROPERTY,
+ HIVE_DEFAULT_DRIVER_NAME));
+ return storeParameters;
+ }
+}
diff --git
a/gora-hive/src/main/java/org/apache/gora/hive/store/package-info.java
b/gora-hive/src/main/java/org/apache/gora/hive/store/package-info.java
new file mode 100644
index 0000000..7057e86
--- /dev/null
+++ b/gora-hive/src/main/java/org/apache/gora/hive/store/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains all the Hive store related classes.
+ */
+package org.apache.gora.hive.store;
\ No newline at end of file
diff --git
a/gora-hive/src/main/java/org/apache/gora/hive/util/HiveQueryBuilder.java
b/gora-hive/src/main/java/org/apache/gora/hive/util/HiveQueryBuilder.java
new file mode 100644
index 0000000..dadd8ca
--- /dev/null
+++ b/gora-hive/src/main/java/org/apache/gora/hive/util/HiveQueryBuilder.java
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.hive.util;
+
+import java.lang.invoke.MethodHandles;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.gora.hive.store.HiveMapping;
+import org.apache.gora.hive.store.HiveStore;
+import org.apache.gora.persistency.impl.BeanFactoryImpl;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.util.ClassLoadingUtils;
+import org.apache.gora.util.GoraException;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is to build hive SQL queries for schema creation and inserting
items into the hive
+ * store
+ */
+public class HiveQueryBuilder {
+
+ private static final Logger LOG =
LoggerFactory.getLogger((MethodHandles.lookup().lookupClass()));
+
+ private HiveStore<?, ?> hiveStore;
+ private HiveMapping hiveMapping;
+
+ /**
+ * common characters for building queries
+ **/
+ private static final char QUESTION_SYMBOL = '?';
+ private static final char QUOTE_SYMBOL = '\'';
+ private static final char OPEN_BRACKET_SYMBOL = '(';
+ private static final char CLOSE_BRACKET_SYMBOL = ')';
+ private static final char COMMA_SYMBOL = ',';
+ private static final char SPACE_SYMBOL = ' ';
+ private static final char PERIOD_SYMBOL = '.';
+
+ public HiveQueryBuilder(HiveStore<?, ?> hiveStore, HiveMapping hiveMapping) {
+ this.hiveStore = hiveStore;
+ this.hiveMapping = hiveMapping;
+ }
+
+ /**
+ * Create hive sql query to create the schema table
+ *
+ * @param databaseName hive database name
+ * @param fieldMap map of avro fields to their field names
+ * @return hive sql query
+ * @throws GoraException throw if the generation of the sql is failed
+ */
+ public String buildCreateQuery(String databaseName, Map<String, Field>
fieldMap)
+ throws GoraException {
+ //Initiate create query
+ StringBuilder createQuery = new StringBuilder();
+ createQuery.append("create table if not exists ")
+ .append(databaseName).append(PERIOD_SYMBOL)
+ .append(hiveStore.getSchemaName()).append(OPEN_BRACKET_SYMBOL);
+ //Create an Avro schema including fields only in mappings
+ Schema avroSchema = createAvroSchema(fieldMap);
+ try {
+ buildColumnType(createQuery, avroSchema);
+ } catch (SerDeException e) {
+ throw new GoraException("Schema inspection has been failed.", e);
+ }
+ createQuery.append(CLOSE_BRACKET_SYMBOL);
+ return createQuery.toString();
+ }
+
+ /**
+ * Create hive query to insert persistent item into hive store
+ *
+ * @param key value of the key
+ * @param value item to be stored
+ * @param fieldMap schema fields mapping to their names
+ * @param parameterList empty list to be filled with parameters of the sql
+ * @return parameterized hive sql string.
+ * @throws GoraException throw if the generation of the sql is failed
+ */
+ public String buildInsertQuery(Object key, PersistentBase value, Map<String,
Field> fieldMap,
+ List<Object> parameterList)
+ throws GoraException {
+ StringBuilder insert = new StringBuilder();
+ StringBuilder fields = new StringBuilder();
+ StringBuilder values = new StringBuilder();
+
+ insert.append("insert into").append(SPACE_SYMBOL)
+ .append(hiveStore.getSchemaName()).append(OPEN_BRACKET_SYMBOL);
+ values.append("select").append(SPACE_SYMBOL);
+
+ //add key value
+ fields.append(HiveMapping.DEFAULT_KEY_NAME);
+ parameterList.add((key instanceof String) ? key.toString() : key);
+ values.append(QUESTION_SYMBOL);
+
+ //add field values
+ for (String fieldName : hiveMapping.getFields()) {
+ Field field = fieldMap.get(fieldName);
+ if (field == null) {
+ LOG.warn("{} is skipped as it is not recognised as a field in the
schema", fieldName);
+ continue;
+ }
+ Object fieldValue = value.get(field.pos());
+ if (value.isDirty(field.name()) && fieldValue != null) {
+ Object serializedValue = serializeValue(parameterList, field.schema(),
fieldValue);
+ if (serializedValue != null && (!(serializedValue instanceof String)
+ || !((String) serializedValue).isEmpty())) {
+ fields.append(COMMA_SYMBOL).append(SPACE_SYMBOL)
+ .append(field.name().toLowerCase(Locale.getDefault()));
+ values.append(COMMA_SYMBOL).append(SPACE_SYMBOL)
+ .append(serializedValue);
+ }
+ }
+ }
+ fields.append(CLOSE_BRACKET_SYMBOL).append(SPACE_SYMBOL);
+ insert.append(fields).append(values);
+ return insert.toString();
+ }
+
+ /**
+ * Inspect avro schema and append its fields with their types to the create
query.
+ *
+ * @param createQuery String builder to generate the create query
+ * @param schema Avro schema to derive column names and types
+ * @throws SerDeException Throw if schema inspection failed
+ */
+ private void buildColumnType(StringBuilder createQuery, Schema schema)
throws SerDeException {
+ AvroObjectInspectorGenerator generator = new
AvroObjectInspectorGenerator(schema);
+ List<TypeInfo> typeInfos = generator.getColumnTypes();
+ List<String> names = generator.getColumnNames();
+ for (int i = 0; i < names.size(); i++) {
+ createQuery.append(names.get(i)).append(" ")
+ .append(typeInfos.get(i).getTypeName());
+ if (i < names.size() - 1) {
+ createQuery.append(COMMA_SYMBOL).append(SPACE_SYMBOL);
+ }
+ }
+ }
+
+ /**
+ * Create an avro schema including keys and fields of the mapping.
+ *
+ * @param fieldMap Avro field map which maps each field to its name
+ * @return Genereated avro schema
+ * @throws GoraException throws if the schema generation is failed
+ */
+ private Schema createAvroSchema(Map<String, Field> fieldMap) throws
GoraException {
+ Class<?> persistentClass = hiveStore.getPersistentClass();
+ Schema avroSchema = Schema.createRecord(persistentClass.getSimpleName(),
null,
+ persistentClass.getPackage().getName(), false);
+ List<Field> avroFieldList = new ArrayList<>();
+
+ avroFieldList.add(new Field(HiveMapping.DEFAULT_KEY_NAME, getKeySchema(),
null, 1));
+
+ List<String> fieldNames = hiveMapping.getFields();
+ for (String fieldName : fieldNames) {
+ Field field = fieldMap.get(fieldName);
+ if (field == null) {
+ throw new GoraException(
+ "Could not find a avro field for field name : " + fieldName);
+ }
+ avroFieldList.add(new Field(field.name(), field.schema(), field.doc(),
field.defaultVal()));
+ }
+ avroSchema.setFields(avroFieldList);
+ return avroSchema;
+ }
+
+ /**
+ * Derive the schema for the key field
+ *
+ * @return schema for the key field
+ * @throws GoraException throw if no schema could identified for the key
class
+ */
+ private Schema getKeySchema() throws GoraException {
+ final String keyName =
hiveStore.getKeyClass().getSimpleName().toUpperCase(Locale.getDefault());
+ try {
+ Type keyType = Type.valueOf(keyName);
+ return Schema.create(keyType);
+ } catch (IllegalArgumentException | AvroRuntimeException e) {
+ throw new GoraException("Failed to derive schema for the key class name
: " + keyName, e);
+ }
+ }
+
+ /**
+ * Serialize the given value according to its schema. if a primitive type
has to be included as a
+ * a value, it will be added to the parameter list
+ *
+ * @param parameterList carries the list of parameters to be injected into
sql
+ * @param schema the value should be serialized based on the schema
+ * @param value the value to be serialized
+ * @return serialized value
+ * @throws GoraException throw if serialization is failed.
+ */
+ private Object serializeValue(List<Object> parameterList, Schema schema,
Object value)
+ throws GoraException {
+ if (value == null) {
+ return getNullValue(parameterList, schema);
+ }
+ final Type type = schema.getType();
+ switch (type) {
+ case RECORD:
+ return serializeRecord(parameterList, schema, value);
+ case MAP:
+ return serializeMap(parameterList, schema, value);
+ case ARRAY:
+ return serializeArray(parameterList, schema, value);
+ case UNION:
+ return serializeUnion(parameterList, schema, value);
+ case STRING:
+ case ENUM:
+ parameterList.add(value.toString());
+ return QUESTION_SYMBOL;
+ case BYTES:
+ return serializeBytes(parameterList, value);
+ default:
+ parameterList.add(value);
+ return QUESTION_SYMBOL;
+ }
+ }
+
+ /**
+ * A record is handled as a struct in hive context and the sql query for
structs are define as
+ * named_struct(field1, value1, field2, value2,..). Moreover nested structs
are not allowed to be
+ * null and they have to be empty structs which all fields represent
respective null values.
+ *
+ * @param parameterList carries the list of parameters to be injected into
sql
+ * @param schema the record schema
+ * @param value the record object
+ * @return serialized value
+ * @throws GoraException throw if serialization is failed.
+ */
+ private Object serializeRecord(List<Object> parameterList, Schema schema,
Object value)
+ throws GoraException {
+ if (value == null) {
+ return getNullValue(parameterList, schema);
+ } else if (!(value instanceof PersistentBase)) {
+ throw new GoraException("Record value is not a persistent object");
+ }
+ PersistentBase record = (PersistentBase) value;
+
+ StringBuilder valueBuilder = new StringBuilder();
+ valueBuilder.append("named_struct(");
+
+ List<Field> fields = schema.getFields();
+ for (int i = 0; i < fields.size(); i++) {
+ Field field = fields.get(i);
+
valueBuilder.append(QUOTE_SYMBOL).append(field.name()).append(QUOTE_SYMBOL)
+ .append(COMMA_SYMBOL).append(SPACE_SYMBOL);
+ valueBuilder.append(serializeValue(parameterList, field.schema(),
record.get(field.pos())));
+ if (i < fields.size() - 1) {
+ valueBuilder.append(",");
+ }
+ }
+ return valueBuilder.append(CLOSE_BRACKET_SYMBOL).toString();
+ }
+
+ /**
+ * Serialize a given map by serializing its values based on
schema.valueType. map values in the
+ * sql have to be defined as map(key1, value1, key2, value2,..). If the map
value is null or
+ * empty, an empty map \"map(null, null_value_type)\" should be returned and
it is required to
+ * represent the null value of the respective value type in the first value
entry. Ex : if the map
+ * is binary type. the empty map will be map(null, binary(null))
+ *
+ * @param parameterList carries the list of parameters to be injected into
sql
+ * @param schema the map schema
+ * @param value the map object
+ * @return serialized value
+ * @throws GoraException throw if serialization is failed.
+ */
+ private Object serializeMap(List<Object> parameterList, Schema schema,
Object value)
+ throws GoraException {
+ if (value == null) {
+ return getNullValue(parameterList, schema);
+ } else if (!(value instanceof Map)) {
+ throw new GoraException("Value is not an object of java.util.Map for
schema type MAP");
+ }
+ Map<?, ?> map = (Map<?, ?>) value;
+ if (map.keySet().isEmpty()) {
+ return getNullValue(parameterList, schema);
+ }
+ //create a map serializing all its entries
+ StringBuilder valueBuilder = new StringBuilder();
+ valueBuilder.append("map(");
+ int size = map.keySet().size();
+ for (Map.Entry<?, ?> entry : map.entrySet()) {
+
valueBuilder.append(QUOTE_SYMBOL).append(entry.getKey().toString()).append(QUOTE_SYMBOL)
+ .append(COMMA_SYMBOL).append(SPACE_SYMBOL)
+ .append(serializeValue(parameterList, schema.getValueType(),
entry.getValue()));
+ if (size-- > 1) {
+ valueBuilder.append(COMMA_SYMBOL).append(SPACE_SYMBOL);
+ }
+ }
+ return valueBuilder.append(CLOSE_BRACKET_SYMBOL).toString();
+ }
+
+ /**
+ * Serialize a given list by serializing its values based on
schema.elementType. list values in
+ * the sql have to be defined as array(value1, value2,..). If the list is
null or empty, an empty
+ * list \"array(null)\" should be returned and it is required to represent
the null value of the
+ * respective value type. Ex : if the array is binary type. the empty array
will be
+ * array(binary(null))
+ *
+ * @param parameterList carries the list of parameters to be injected into
sql
+ * @param schema the array schema
+ * @param value the list object
+ * @return serialized value
+ * @throws GoraException throw if serialization is failed.
+ */
+ private Object serializeArray(List<Object> parameterList, Schema schema,
Object value)
+ throws GoraException {
+ if (!(value instanceof List) && value != null) {
+ throw new GoraException("Value is not an object of java.util.List for
schema type ARRAY");
+ }
+
+ List<?> list = (List<?>) value;
+ if (list == null || list.isEmpty()) {
+ return getNullValue(parameterList, schema);
+ } else {
+ StringBuilder valueBuilder = new StringBuilder();
+ valueBuilder.append("array(");
+ for (int i = 0; i < list.size(); i++) {
+ valueBuilder.append(serializeValue(parameterList,
schema.getElementType(), list.get(i)));
+ if (i < list.size() - 1) {
+ valueBuilder.append(", ");
+ }
+ }
+ return valueBuilder.append(CLOSE_BRACKET_SYMBOL).toString();
+ }
+ }
+
+ /**
+ * Serialize a given object based on union types. If the union type has only
two values and one of
+ * them is null type, the schema is considered as a type of the not null
type. However if there
+ * are more than one not-null types, the value should be defined as
create_union(position,
+ * serialized_value). Ex : if the field is UNION<String, Float>, the query
should be
+ * "create_union(1, 'value')". the position is counted using only not-null
types.
+ *
+ * @param parameterList carries the list of parameters to be injected into
sql
+ * @param schema the union schema
+ * @param value the value object
+ * @return serialized value
+ * @throws GoraException throw if serialization is failed.
+ */
+ private Object serializeUnion(List<Object> parameterList, Schema schema,
Object value)
+ throws GoraException {
+ List<Schema> schemas = schema.getTypes();
+
+ if (schemas.size() == 2 && isNullable(schema)) {
+ return serializeValue(parameterList, getValidSchema(schema), value);
+ }
+
+ StringBuilder valueBuilder = new StringBuilder();
+ int count = 1;
+ for (Schema valueSchema : schemas) {
+ if (!Type.NULL.equals(valueSchema.getType())) {
+ if (isValidType(valueSchema.getType(), value)) {
+ valueBuilder.append("create_union(")
+ .append(count).append(COMMA_SYMBOL).append(SPACE_SYMBOL)
+ .append(serializeValue(parameterList, valueSchema, value))
+ .append(CLOSE_BRACKET_SYMBOL);
+ return valueBuilder.toString();
+ }
+ count++;
+ }
+ }
+ throw new GoraException("Serializing union value is failed");
+ }
+
+ /**
+ * Serialize a given object into a binary. The sql string is defined as
"binary(value)"
+ *
+ * @param parameterList carries the list of parameters to be injected into
sql
+ * @param value the byte object
+ * @return serialized value
+ */
+ private Object serializeBytes(List<Object> parameterList, Object value) {
+ if (value instanceof ByteBuffer) {
+ ByteBuffer clone = ByteBuffer.allocate(((ByteBuffer) value).capacity());
+ clone.put((ByteBuffer) value);
+ ((ByteBuffer) value).rewind();
+ clone.flip();
+ value = QUOTE_SYMBOL + Charset.defaultCharset().decode(clone).toString()
+ QUOTE_SYMBOL;
+ }
+ parameterList.add(value);
+ return "binary(?)";
+ }
+
+ /**
+ * Returns the first not-null sub-schema from a union schema
+ *
+ * @param schemas Union schema
+ * @return first valid sub-schema
+ * @throws GoraException throw if a valid schema is not found
+ */
+ static Schema getValidSchema(Schema schemas) throws GoraException {
+ for (Schema innerSchema : schemas.getTypes()) {
+ if (!Type.NULL.equals(innerSchema.getType())) {
+ return innerSchema;
+ }
+ }
+ throw new GoraException("Could not find a valid schema");
+ }
+
+ /**
+ * Generate the null value for a given schema type
+ *
+ * @param parameterList carries the list of parameters to be injected into
sql
+ * @param schema schema to get null type
+ * @return null value for the schema.type
+ * @throws GoraException throw if the null value generation is failed
+ */
+ private Object getNullValue(List<Object> parameterList, Schema schema)
throws GoraException {
+ final Type type = schema.getType();
+ switch (type) {
+ case BYTES:
+ return "binary(null)";
+ case MAP:
+ return "map(null," + getNullValue(parameterList, schema.getValueType())
+ + CLOSE_BRACKET_SYMBOL;
+ case ARRAY:
+ return "array(" + getNullValue(parameterList, schema.getElementType())
+ + CLOSE_BRACKET_SYMBOL;
+ case UNION:
+ return serializeUnion(parameterList, schema, null);
+ case RECORD:
+ Class<?> clazz;
+ try {
+ clazz = ClassLoadingUtils.loadClass(schema.getFullName());
+ } catch (ClassNotFoundException e) {
+ throw new GoraException(e);
+ }
+ @SuppressWarnings("unchecked") final PersistentBase emptyRecord =
(PersistentBase) new BeanFactoryImpl(
+ hiveStore.getKeyClass(), clazz).newPersistent();
+ return serializeRecord(parameterList, schema, emptyRecord);
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Compare the object type and the required schema type and return true if
the object type is
+ * compatible with the schema type. if the type matches non of the defined
types, the object is
+ * considered to be a valid object for any type
+ *
+ * @param type schema type to be compared
+ * @param value object to be compared
+ * @return true if object is compatible with the schema type
+ */
+ private boolean isValidType(Type type, Object value) {
+ switch (type) {
+ case INT:
+ return (value instanceof Integer);
+ case LONG:
+ return (value instanceof Long);
+ case BYTES:
+ return (value instanceof Byte[]) || (value instanceof ByteBuffer);
+ case NULL:
+ return (value == null);
+ case STRING:
+ return (value instanceof CharSequence);
+ case ENUM:
+ return (value instanceof Enum);
+ case DOUBLE:
+ return (value instanceof Double);
+ case FLOAT:
+ return (value instanceof Float);
+ case BOOLEAN:
+ return (value instanceof Boolean);
+ case MAP:
+ return (value instanceof Map);
+ case ARRAY:
+ return (value instanceof List) || (value != null &&
value.getClass().isArray());
+ case RECORD:
+ return (value instanceof PersistentBase);
+ default:
+ return true;
+ }
+ }
+
+ /**
+ * Check whether the given union schema contains any nullable sub-schemas
+ *
+ * @param schemas union schema
+ * @return true if the list of sub-schemas contain a nullable schema
+ */
+ static boolean isNullable(Schema schemas) {
+ for (Schema innerSchema : schemas.getTypes()) {
+ if (innerSchema.getType().equals(Type.NULL)) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git
a/gora-hive/src/main/java/org/apache/gora/hive/util/HiveResultParser.java
b/gora-hive/src/main/java/org/apache/gora/hive/util/HiveResultParser.java
new file mode 100644
index 0000000..5ff436f
--- /dev/null
+++ b/gora-hive/src/main/java/org/apache/gora/hive/util/HiveResultParser.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.hive.util;
+
+import java.lang.invoke.MethodHandles;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.hive.store.HiveStore;
+import org.apache.gora.persistency.impl.BeanFactoryImpl;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.util.AvroUtils;
+import org.apache.gora.util.ClassLoadingUtils;
+import org.apache.gora.util.GoraException;
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class parses a given result set into a persistent schema object
+ */
+public class HiveResultParser {
+
+ private static final Logger LOG =
LoggerFactory.getLogger((MethodHandles.lookup().lookupClass()));
+
+ private HiveStore<?, ?> hiveStore;
+
+ public HiveResultParser(HiveStore<?, ?> hiveStore) {
+ this.hiveStore = hiveStore;
+ }
+
+ /**
+ * Parsing a value based on given schema
+ * @param schema schema value to be used
+ * @param value value to be parsed
+ * @return parsed object value
+ * @throws GoraException throw if parsing the object is failed
+ */
+ public Object parseSchema(Schema schema, Object value) throws GoraException {
+ if (value == null) {
+ return null;
+ }
+ final Type type = schema.getType();
+ switch (type) {
+ case INT:
+ return (value instanceof String) ?
Integer.parseInt(String.valueOf(value)) : value;
+ case STRING:
+ return new Utf8(String.valueOf(value));
+ case ENUM:
+ return AvroUtils.getEnumValue(schema, String.valueOf(value));
+ case FLOAT:
+ return (value instanceof String) ?
Float.parseFloat(String.valueOf(value)) : value;
+ case DOUBLE:
+ return (value instanceof String) ?
Double.parseDouble(String.valueOf(value)) : value;
+ case LONG:
+ return (value instanceof String) ?
Long.parseLong(String.valueOf(value)) : value;
+ case BOOLEAN:
+ return (value instanceof String) ?
Boolean.parseBoolean(String.valueOf(value)) : value;
+ case BYTES:
+ return parseBytes(value);
+ case ARRAY:
+ return parseArray(value, schema);
+ case MAP:
+ return parseMap(value, schema);
+ case UNION:
+ return parseUnion(value, schema);
+ case RECORD:
+ return parseRecord(value, schema);
+ default:
+ return value;
+ }
+ }
+
+ /**
+ * Parse a given bytes value into a ByteBuffer. When bytes are stored using
a complex parent data
+ * type like map<string, bytes>, a quotation mark (') is added to the front
and trailer of the
+ * byte string it will be removed here
+ *
+ * @param value to parsed
+ * @return {@link ByteBuffer} parsed byte buffer
+ */
+ private Object parseBytes(Object value) {
+ if (value == null) {
+ return null;
+ }
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Parsing byte string :{}", value);
+ }
+ String byteString = value.toString();
+ if (value instanceof byte[]) {
+ byte[] byteArray = (byte[]) value;
+ byteString = new String(byteArray, Charset.defaultCharset());
+ if ('\'' == byteString.charAt(0)) {
+ byteString = byteString.substring(1);
+ }
+ if ('\'' == byteString.charAt(byteString.length() - 1)) {
+ byteString = byteString.substring(0, byteString.length() - 1);
+ }
+ }
+ return ByteBuffer.wrap(byteString.getBytes(Charset.defaultCharset()));
+ }
+
+ /**
+ * Arrays are returned as json array strings and this will parse them back
to lists.
+ *
+ * @param value json array string
+ * @return {@link List}parsed list
+ * @throws GoraException throw if parsing array values is failed
+ */
+ private Object parseArray(Object value, Schema schema) throws GoraException {
+ if (value == null) {
+ return null;
+ }
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Parsing json array : {}", value);
+ }
+ Schema elementSchema = schema.getElementType();
+ JSONArray jsonArray = new JSONArray(String.valueOf(value));
+ List<Object> valueList = new ArrayList<>();
+ for (int i = 0; i < jsonArray.length(); i++) {
+ valueList.add(parseSchema(elementSchema, jsonArray.get(i)));
+ }
+ return valueList;
+ }
+
+ /**
+ * Maps are returned as json objects and this will parse them back to map
objects
+ *
+ * @param value String of json object
+ * @param schema Map schema to be used for parsing map values
+ * @return {@link Map} Parsed map
+ * @throws GoraException throw if parsing map entries is failed
+ */
+ private Object parseMap(Object value, Schema schema) throws GoraException {
+ if (value == null) {
+ return null;
+ }
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Parsing json object:{} as a map", value);
+ }
+ Schema valueSchema = schema.getValueType();
+ JSONObject jsonObject = new JSONObject(String.valueOf(value));
+ Map<CharSequence, Object> valueMap = new HashMap<>();
+ Iterator<String> keys = jsonObject.keys();
+ while (keys.hasNext()) {
+ String key = keys.next();
+ valueMap.put(new Utf8(key), parseSchema(valueSchema,
jsonObject.get(key)));
+ }
+ return valueMap;
+ }
+
+ /**
+ * This parses the given object into union typed object. If there is only
one not-null sub-type,
+ * the values parsed according to that schema regarless of this union
schema. If there are more
+ * than one not-null sub-types, the object is consided to be a json string
with one field
+ * representing the position of the type of the given value among the the
list of sub-types.<br>
+ * Ex: if the schema is union<String, Array<int>>, the object will be {2,
'[1,2,3]'}
+ *
+ * @param value union object
+ * @param schema union schema to be used for parsing map values
+ * @return {@link Object} Parsed object
+ * @throws GoraException throw if parsing union value is failed
+ */
+ private Object parseUnion(Object value, Schema schema) throws GoraException {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Parsing json object:{} as a union", value);
+ }
+ List<Schema> typeSchemaList = schema.getTypes();
+ Schema primarySchema = null;
+ if (typeSchemaList.size() == 2 && HiveQueryBuilder.isNullable(schema)) {
+ primarySchema = HiveQueryBuilder.getValidSchema(schema);
+ } else {
+ JSONObject unionObject = new JSONObject(String.valueOf(value));
+ int position = Integer.parseInt(unionObject.keys().next());
+ for (Schema typeSchema : typeSchemaList) {
+ if (!(Type.NULL.equals(typeSchema.getType())) && (position-- == 0)) {
+ primarySchema = typeSchema;
+ break;
+ }
+ }
+ }
+ if (primarySchema == null) {
+ return null;
+ } else {
+ return parseSchema(primarySchema, value);
+ }
+ }
+
+ /**
+ * Parse a json object to a persistent record.
+ *
+ * @param value json object string
+ * @param schema record schema to be used for parsing the object
+ * @return persistent object
+ * @throws GoraException throw if parsing record value is failed
+ */
+ private Object parseRecord(Object value, Schema schema) throws GoraException
{
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Parsing json object:{} as a record", value);
+ }
+ Class<?> clazz;
+ try {
+ clazz = ClassLoadingUtils.loadClass(schema.getFullName());
+ } catch (ClassNotFoundException e) {
+ throw new GoraException(e);
+ }
+ @SuppressWarnings("unchecked") final PersistentBase record =
(PersistentBase) new BeanFactoryImpl(
+ hiveStore.getKeyClass(), clazz).newPersistent();
+ JSONObject recordObject = new JSONObject(String.valueOf(value));
+ for (Field recField : schema.getFields()) {
+ Schema innerSchema = recField.schema();
+ if (recordObject.has(recField.name())) {
+ record.put(recField.pos(), parseSchema(innerSchema,
recordObject.get(recField.name())));
+ }
+ }
+ return record;
+ }
+}
diff --git
a/gora-hive/src/main/java/org/apache/gora/hive/util/package-info.java
b/gora-hive/src/main/java/org/apache/gora/hive/util/package-info.java
new file mode 100644
index 0000000..884dd6b
--- /dev/null
+++ b/gora-hive/src/main/java/org/apache/gora/hive/util/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains all utility classes to execute queries on Hive store
+ */
+package org.apache.gora.hive.util;
\ No newline at end of file
diff --git
a/gora-hive/src/test/java/org/apache/gora/hive/GoraHiveTestDriver.java
b/gora-hive/src/test/java/org/apache/gora/hive/GoraHiveTestDriver.java
new file mode 100644
index 0000000..9354940
--- /dev/null
+++ b/gora-hive/src/test/java/org/apache/gora/hive/GoraHiveTestDriver.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.hive;
+
+import org.apache.gora.GoraTestDriver;
+import org.apache.gora.hive.store.HiveStore;
+import org.apache.gora.hive.util.HiveTestServer;
+
+public class GoraHiveTestDriver extends GoraTestDriver {
+
+ private HiveTestServer testServer;
+
+ public GoraHiveTestDriver() {
+ super(HiveStore.class);
+ }
+
+ @Override
+ public void setUpClass() throws Exception {
+ log.info("setting up hive test driver");
+ if (testServer == null) {
+ testServer = new HiveTestServer();
+ }
+ testServer.start();
+ }
+
+ @Override
+ public void tearDownClass() throws Exception {
+ log.info("tearing down hive test driver");
+ if (testServer != null) {
+ testServer.stop();
+ }
+ }
+}
diff --git a/gora-hive/src/test/java/org/apache/gora/hive/package-info.java
b/gora-hive/src/test/java/org/apache/gora/hive/package-info.java
new file mode 100644
index 0000000..958eaab
--- /dev/null
+++ b/gora-hive/src/test/java/org/apache/gora/hive/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains Hive test driver implementation classes
+ */
+package org.apache.gora.hive;
\ No newline at end of file
diff --git
a/gora-hive/src/test/java/org/apache/gora/hive/store/TestHiveStore.java
b/gora-hive/src/test/java/org/apache/gora/hive/store/TestHiveStore.java
new file mode 100644
index 0000000..0d635f2
--- /dev/null
+++ b/gora-hive/src/test/java/org/apache/gora/hive/store/TestHiveStore.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.hive.store;
+
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Set;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.examples.generated.Employee;
+import org.apache.gora.examples.generated.Metadata;
+import org.apache.gora.examples.generated.WebPage;
+import org.apache.gora.hive.GoraHiveTestDriver;
+import org.apache.gora.persistency.impl.BeanFactoryImpl;
+import org.apache.gora.store.DataStoreTestBase;
+import org.apache.gora.store.DataStoreTestUtil;
+import org.apache.gora.util.GoraException;
+import org.apache.gora.util.StringUtils;
+import org.junit.Ignore;
+
+/**
+ * HiveStore Tests extending {@link DataStoreTestBase} which run the base
JUnit test suite for
+ * Gora.
+ */
+public class TestHiveStore extends DataStoreTestBase {
+
+ static {
+ try {
+ setTestDriver(new GoraHiveTestDriver());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void assertSchemaExists(String schemaName) throws Exception {
+ assertTrue(employeeStore.schemaExists());
+ }
+
+ @Override
+ public void assertPut(Employee employee) throws GoraException {
+ employeeStore.put(employee.getSsn().toString(), employee);
+ }
+
+ @Override
+ public void testGetWithFields() throws Exception {
+ //Overrides DataStoreTestBase.testGetWithFields to avoid recursive field
"boss"
+ Employee employee = DataStoreTestUtil.createEmployee();
+ WebPage webpage = DataStoreTestUtil.createWebPage();
+ employee.setWebpage(webpage);
+ String ssn = employee.getSsn().toString();
+ employeeStore.put(ssn, employee);
+ employeeStore.flush();
+
+ String[] fields = ((HiveStore<String, Employee>)
employeeStore).getFields();
+ for (Set<String> subset : StringUtils.powerset(fields)) {
+ if (subset.isEmpty()) {
+ continue;
+ }
+ Employee after = employeeStore.get(ssn, subset.toArray(new
String[subset.size()]));
+ Employee expected = Employee.newBuilder().build();
+ for (String field : subset) {
+ int index = expected.getSchema().getField(field).pos();
+ expected.put(index, employee.get(index));
+ }
+
+ DataStoreTestUtil.assertEqualEmployeeObjects(expected, after);
+ }
+ }
+
+ @Override
+ public void testGet() throws Exception {
+ //Overrides DataStoreTestBase.testGet to avoid recursive field "boss"
+ log.info("test method: testGet");
+ employeeStore.createSchema();
+ Employee employee = DataStoreTestUtil.createEmployee();
+ String ssn = employee.getSsn().toString();
+ employeeStore.put(ssn, employee);
+ employeeStore.flush();
+ Employee after = employeeStore.get(ssn, null);
+ DataStoreTestUtil.assertEqualEmployeeObjects(employee, after);
+ }
+
+ @Override
+ public void testGetNested() throws Exception {
+ //Overrides DataStoreTestBase.testGetNested to avoid recursive field "boss"
+ Employee employee = DataStoreTestUtil.createEmployee();
+
+ WebPage webpage = new BeanFactoryImpl<>(String.class,
WebPage.class).newPersistent();
+ webpage.setUrl(new Utf8("url.."));
+ webpage.setContent(ByteBuffer.wrap("test
content".getBytes(Charset.defaultCharset())));
+ webpage.setParsedContent(new ArrayList<>());
+
+ Metadata metadata = new BeanFactoryImpl<>(String.class,
Metadata.class).newPersistent();
+ webpage.setMetadata(metadata);
+ employee.setWebpage(webpage);
+ String ssn = employee.getSsn().toString();
+
+ employeeStore.put(ssn, employee);
+ employeeStore.flush();
+ Employee after = employeeStore.get(ssn, null);
+ DataStoreTestUtil.assertEqualEmployeeObjects(employee, after);
+ DataStoreTestUtil.assertEqualWebPageObjects(webpage, after.getWebpage());
+ }
+
+ @Ignore("Hive test server doesn't support deleting and updating entries")
+ @Override
+ public void testExists() throws Exception {
+ //Hive test server doesn't support deleting and updating entries
+ }
+
+ @Ignore("Hive test server doesn't support deleting and updating entries")
+ @Override
+ public void testDelete() throws Exception {
+ //Hive test server doesn't support deleting and updating entries
+ }
+
+ @Ignore("Hive test server doesn't support deleting and updating entries")
+ @Override
+ public void testDeleteByQuery() throws Exception {
+ //Hive test server doesn't support deleting and updating entries
+ }
+
+ @Ignore("Hive test server doesn't support deleting and updating entries")
+ @Override
+ public void testDeleteByQueryFields() throws Exception {
+ //Hive test server doesn't support deleting and updating entries
+ }
+
+ @Ignore("Hive test server doesn't support deleting and updating entries")
+ @Override
+ public void testUpdate() throws Exception {
+ //Hive test server doesn't support deleting and updating entries
+ }
+
+ @Ignore("Hive datastore doesn't support recursive records")
+ @Override
+ public void testGetRecursive() throws Exception {
+ //Hive datastore doesn't support recursive records
+ }
+
+ @Ignore("Hive datastore doesn't support recursive records")
+ @Override
+ public void testGetDoubleRecursive() throws Exception {
+ //Hive datastore doesn't support recursive records
+ }
+
+ @Ignore("As recursive records are not supported, employee.boss field cannot
be processed.")
+ @Override
+ public void testGet3UnionField() throws Exception {
+ //As recursive records are not supported, employee.boss field cannot be
processed.
+ }
+}
diff --git
a/gora-hive/src/test/java/org/apache/gora/hive/store/package-info.java
b/gora-hive/src/test/java/org/apache/gora/hive/store/package-info.java
new file mode 100644
index 0000000..04d11b2
--- /dev/null
+++ b/gora-hive/src/test/java/org/apache/gora/hive/store/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains all test classes to test Hive store
+ */
+package org.apache.gora.hive.store;
\ No newline at end of file
diff --git
a/gora-hive/src/test/java/org/apache/gora/hive/util/HiveTestServer.java
b/gora-hive/src/test/java/org/apache/gora/hive/util/HiveTestServer.java
new file mode 100644
index 0000000..3ff464e
--- /dev/null
+++ b/gora-hive/src/test/java/org/apache/gora/hive/util/HiveTestServer.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.hive.util;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.service.Service;
+import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.cli.SessionHandle;
+import org.apache.hive.service.server.HiveServer2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Hive test server implementation
+ */
+public class HiveTestServer {
+
+ private static final Logger log =
LoggerFactory.getLogger((MethodHandles.lookup().lookupClass()));
+ private HiveServer2 hiveServer;
+
+ /**
+ * Initiate a hive server instance
+ *
+ * @throws Exception throws if server initiation is failed
+ */
+ public void start() throws Exception {
+ log.info("Starting Hive Test Server...");
+ if (hiveServer == null) {
+ hiveServer = new HiveServer2();
+ hiveServer.init(new HiveConf());
+ hiveServer.start();
+ waitForStartup();
+ log.info("Hive Test Server Started");
+ }
+ }
+
+ /**
+ * Waiting for maximum of one minute to start the server.
+ *
+ * @throws Exception throws if server couldn't start within the time limit
+ */
+ private void waitForStartup() throws Exception {
+ long timeout = TimeUnit.MINUTES.toMillis(1);
+ long unitOfWait = TimeUnit.SECONDS.toMillis(1) * 5;
+ CLIService hs2Client = getServiceClientInternal();
+ SessionHandle sessionHandle = null;
+ for (int interval = 0; interval < timeout / unitOfWait; interval++) {
+ Thread.sleep(unitOfWait);
+ try {
+ Map<String, String> sessionConf = new HashMap<>();
+ sessionHandle = hs2Client.openSession("hive", "", sessionConf);
+ return;
+ } catch (Exception e) {
+ //server hasn't started yet
+ } finally {
+ hs2Client.closeSession(sessionHandle);
+ }
+ }
+ throw new TimeoutException("Hive test server starting timeout");
+ }
+
+ /**
+ * Get the client service from the hive server instance.
+ *
+ * @return CLIService client service initiated in the hive server instance
+ */
+ private CLIService getServiceClientInternal() {
+ for (Service service : hiveServer.getServices()) {
+ if (service instanceof CLIService) {
+ return (CLIService) service;
+ }
+ }
+ throw new IllegalStateException("Cannot find CLIService");
+ }
+
+ /**
+ * Stop hive test server
+ */
+ public void stop() {
+ if (hiveServer != null) {
+ log.info("Stopping Hive Test Server...");
+ hiveServer.stop();
+ hiveServer = null;
+ log.info("Hive Test Server Stopped SucessFully");
+ }
+ }
+}
diff --git
a/gora-hive/src/test/java/org/apache/gora/hive/util/package-info.java
b/gora-hive/src/test/java/org/apache/gora/hive/util/package-info.java
new file mode 100644
index 0000000..f90e729
--- /dev/null
+++ b/gora-hive/src/test/java/org/apache/gora/hive/util/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains Hive test server implementation
+ */
+package org.apache.gora.hive.util;
\ No newline at end of file
diff --git a/gora-hive/src/test/resources/gora-hive-mapping.xml
b/gora-hive/src/test/resources/gora-hive-mapping.xml
new file mode 100644
index 0000000..6d31df9
--- /dev/null
+++ b/gora-hive/src/test/resources/gora-hive-mapping.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<gora-otd>
+
+ <class name="org.apache.gora.examples.generated.Employee"
keyClass="java.lang.String" tableName="Employee">
+ <field name="name"/>
+ <field name="dateOfBirth"/>
+ <field name="ssn"/>
+ <field name="salary"/>
+ <field name="webpage"/>
+ </class>
+
+ <class name="org.apache.gora.examples.generated.WebPage"
keyClass="java.lang.String" tableName="WebPage">
+ <field name="url"/>
+ <field name="content"/>
+ <field name="parsedContent"/>
+ <field name="outlinks"/>
+ <field name="headers"/>
+ <field name="metadata"/>
+ <field name="byteData"/>
+ <field name="stringData"/>
+ </class>
+
+
+ <class name="org.apache.gora.examples.generated.TokenDatum"
keyClass="java.lang.String">
+ <field name="count"/>
+ </class>
+
+</gora-otd>
diff --git a/gora-hive/src/test/resources/gora.properties
b/gora-hive/src/test/resources/gora.properties
new file mode 100644
index 0000000..8f625fc
--- /dev/null
+++ b/gora-hive/src/test/resources/gora.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+############################
+# HiveStore properties #
+############################
+
+gora.datastore.autocreateschema=true
+gora.datastore.default=org.apache.gora.hive.store.HiveStore
+
+gora.hive.server.url=jdbc:hive2://localhost:10000/default
+gora.hive.database.name=default
+
diff --git a/gora-hive/src/test/resources/hive-site.xml
b/gora-hive/src/test/resources/hive-site.xml
new file mode 100755
index 0000000..ce35e8c
--- /dev/null
+++ b/gora-hive/src/test/resources/hive-site.xml
@@ -0,0 +1,85 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?><!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<!-- Hive Test Server Configurations -->
+<configuration>
+ <property>
+ <name>hive.metastore.schema.verification</name>
+ <value>false</value>
+ </property>
+ <property>
+ <name>datanucleus.schema.autoCreateTables</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>hive.security.authorization.enabled</name>
+ <value>false</value>
+ </property>
+ <property>
+ <name>javax.jdo.option.ConnectionURL</name>
+ <value>jdbc:derby:;databaseName=target/metastore_db;create=true</value>
+ </property>
+ <property>
+ <name>hive.metastore.sasl.enabled</name>
+ <value>false</value>
+ </property>
+ <property>
+ <name>datanucleus.schema.autoCreateAll</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>javax.jdo.option.ConnectionDriverName</name>
+ <value>org.apache.derby.jdbc.EmbeddedDriver</value>
+ </property>
+ <property>
+ <name>hive.metastore.schema.verification</name>
+ <value>false</value>
+ </property>
+ <property>
+ <name>hive.metastore.schema.verification.record.version</name>
+ <value>false</value>
+ </property>
+ <property>
+ <name>datanucleus.transactionIsolation</name>
+ <value>read-committed</value>
+ </property>
+ <property>
+ <name>datanucleus.cache.level2</name>
+ <value>false</value>
+ </property>
+ <property>
+ <name>datanucleus.cache.level2.type</name>
+ <value>none</value>
+ </property>
+ <property>
+ <name>datanucleus.rdbms.useLegacyNativeValueStrategy</name>
+ <value>true</value>
+ <description/>
+ </property>
+ <property>
+ <name>hive.server2.table.type.mapping</name>
+ <value>CLASSIC</value>
+ </property>
+ <property>
+ <name>hive.metastore.warehouse.dir</name>
+ <value>file://${user.dir}/target/hive</value>
+ </property>
+ <property>
+ <name>hive.strict.checks.bucketing</name>
+ <value>false</value>
+ </property>
+</configuration>
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 767f02c..ec6f843 100644
--- a/pom.xml
+++ b/pom.xml
@@ -31,9 +31,9 @@
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<name>Apache Gora</name>
- <description>The Apache Gora open source framework provides an in-memory
data model and
- persistence for big data. Gora supports persisting to column stores, key
value stores,
- document stores and RDBMSs, and analyzing the data with extensive Apache
Hadoop MapReduce
+ <description>The Apache Gora open source framework provides an in-memory
data model and
+ persistence for big data. Gora supports persisting to column stores, key
value stores,
+ document stores and RDBMSs, and analyzing the data with extensive Apache
Hadoop MapReduce
support. </description>
<url>http://gora.apache.org</url>
<inceptionYear>2010</inceptionYear>
@@ -771,6 +771,7 @@
<module>gora-solr</module>
<module>gora-aerospike</module>
<module>gora-ignite</module>
+ <module>gora-hive</module>
<module>gora-tutorial</module>
<module>sources-dist</module>
</modules>
@@ -827,6 +828,12 @@
<orientdb.version>2.2.22</orientdb.version>
<orientqb.version>0.2.0</orientqb.version>
+ <!-- HiveStore Dependencies -->
+ <metamodel.version>5.3.0</metamodel.version>
+ <json.version>20180813</json.version>
+ <hive.version>2.3.5</hive.version>
+ <hadoop-common.version>2.6.0</hadoop-common.version>
+
<!-- Testing Dependencies -->
<junit.version>4.10</junit.version>
<test.container.version>1.4.2</test.container.version>
@@ -854,6 +861,7 @@
<pig.version>0.16.0</pig.version>
<!-- General Properties -->
+ <!--suppress UnresolvedMavenProperty -->
<implementation.build>${scmBranch}@r${buildNumber}</implementation.build>
<javac.src.version>1.8</javac.src.version>
<javac.target.version>1.8</javac.target.version>
@@ -959,6 +967,11 @@
</dependency>
<dependency>
<groupId>org.apache.gora</groupId>
+ <artifactId>gora-hive</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.gora</groupId>
<artifactId>gora-dynamodb</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
@@ -1697,6 +1710,62 @@
<artifactId>ignite-core</artifactId>
<version>${ignite.version}</version>
</dependency>
+
+ <!-- Hive DataStore dependencies -->
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-jdbc</artifactId>
+ <version>${hive.version}</version>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-service</artifactId>
+ <version>${hive.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <version>${hive.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.metamodel</groupId>
+ <artifactId>MetaModel-jdbc</artifactId>
+ <version>${metamodel.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.metamodel</groupId>
+ <artifactId>MetaModel-core</artifactId>
+ <version>${metamodel.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.json</groupId>
+ <artifactId>json</artifactId>
+ <version>${json.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>