create template gora-ignite module

Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/4346216f
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/4346216f
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/4346216f

Branch: refs/heads/master
Commit: 4346216ffbed9a9adef2b90589436de543167c4c
Parents: fb4b355
Author: Carlos Rodrigo Muñoz <[email protected]>
Authored: Mon May 7 23:00:44 2018 -0500
Committer: Carlos Rodrigo Muñoz <[email protected]>
Committed: Mon May 7 23:01:18 2018 -0500

----------------------------------------------------------------------
 gora-ignite/pom.xml                             |  177 +++
 gora-ignite/src/examples/java/.gitignore        |   15 +
 .../gora/ignite/encoders/BinaryEncoder.java     |  191 ++++
 .../apache/gora/ignite/encoders/Encoder.java    |   72 ++
 .../apache/gora/ignite/encoders/HexEncoder.java |  204 ++++
 .../ignite/encoders/SignedBinaryEncoder.java    |  110 ++
 .../org/apache/gora/ignite/encoders/Utils.java  |   91 ++
 .../gora/ignite/encoders/package-info.java      |   20 +
 .../org/apache/gora/ignite/package-info.java    |   20 +
 .../apache/gora/ignite/query/IgniteQuery.java   |   45 +
 .../apache/gora/ignite/query/IgniteResult.java  |  101 ++
 .../apache/gora/ignite/query/package-info.java  |   21 +
 .../apache/gora/ignite/store/IgniteMapping.java |   44 +
 .../apache/gora/ignite/store/IgniteStore.java   | 1034 ++++++++++++++++++
 .../apache/gora/ignite/store/package-info.java  |   20 +
 .../ignite/util/FixedByteArrayOutputStream.java |   45 +
 .../apache/gora/ignite/util/package-info.java   |   20 +
 .../gora/ignite/GoraIgniteTestDriver.java       |   73 ++
 .../org/apache/gora/ignite/package-info.java    |   21 +
 .../ignite/store/AuthenticationTokenTest.java   |   90 ++
 .../gora/ignite/store/IgniteStoreTest.java      |   87 ++
 .../apache/gora/ignite/store/PartitionTest.java |   96 ++
 .../apache/gora/ignite/store/package-info.java  |   21 +
 .../apache/gora/ignite/util/HexEncoderTest.java |   56 +
 .../ignite/util/SignedBinaryEncoderTest.java    |  167 +++
 .../apache/gora/ignite/util/package-info.java   |   20 +
 .../test/resources/gora-accumulo-mapping.xml    |   59 +
 gora-ignite/src/test/resources/gora.properties  |   21 +
 pom.xml                                         |    1 +
 29 files changed, 2942 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/4346216f/gora-ignite/pom.xml
----------------------------------------------------------------------
diff --git a/gora-ignite/pom.xml b/gora-ignite/pom.xml
new file mode 100644
index 0000000..ddd3a4a
--- /dev/null
+++ b/gora-ignite/pom.xml
@@ -0,0 +1,177 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.gora</groupId>
+    <artifactId>gora</artifactId>
+    <version>0.9-SNAPSHOT</version>
+    <relativePath>../</relativePath>
+  </parent>
+  <artifactId>gora-ignite</artifactId>
+  <packaging>bundle</packaging>
+
+  <name>Apache Gora :: Ignite</name>
+  <url>http://gora.apache.org</url>
+  <description>The Apache Gora open source framework provides an in-memory 
data model and
+    persistence for big data. Gora supports persisting to column stores, key 
value stores,
+    document stores and RDBMSs, and analyzing the data with extensive Apache 
Hadoop MapReduce
+    support.</description>
+  <inceptionYear>2010</inceptionYear>
+  <organization>
+    <name>The Apache Software Foundation</name>
+    <url>http://www.apache.org/</url>
+  </organization>
+  <issueManagement>
+    <system>JIRA</system>
+    <url>https://issues.apache.org/jira/browse/GORA</url>
+  </issueManagement>
+  <ciManagement>
+    <system>Jenkins</system>
+    <url>https://builds.apache.org/job/Gora-trunk/</url>
+  </ciManagement>
+
+  <properties>
+    <ignite.version>1.7.1</ignite.version>
+    <osgi.import>*</osgi.import>
+    
<osgi.export>org.apache.gora.ignite*;version="${project.version}";-noimport:=true</osgi.export>
+  </properties>
+
+  <build>
+    <directory>target</directory>
+    <outputDirectory>target/classes</outputDirectory>
+    <finalName>${project.artifactId}-${project.version}</finalName>
+    <testOutputDirectory>target/test-classes</testOutputDirectory>
+    <testSourceDirectory>src/test/java</testSourceDirectory>
+    <sourceDirectory>src/main/java</sourceDirectory>
+    <testResources>
+      <testResource>
+        <directory>${project.basedir}/src/test/resources</directory>
+        <includes>
+          <include>**/*</include>
+        </includes>
+        <!--targetPath>${project.basedir}/target/classes/</targetPath -->
+      </testResource>
+    </testResources>
+    <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <version>${build-helper-maven-plugin.version}</version>
+        <executions>
+          <execution>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>add-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>src/examples/java</source>
+              </sources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <!-- Gora Internal Dependencies -->
+    <dependency>
+      <groupId>org.apache.gora</groupId>
+      <artifactId>gora-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.gora</groupId>
+      <artifactId>gora-core</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <!--Ignite Dependency -->
+    <dependency>
+      <groupId>org.apache.ignite</groupId>
+      <artifactId>ignite-core</artifactId>
+      <version>${ignite.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.ignite</groupId>
+      <artifactId>ignite-minicluster</artifactId>
+      <version>${ignite.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+    </dependency>
+
+    <dependency>
+      <!-- Ignite mini was trying to load the Shell, which failed with the 
version of jline in the parent pom -->
+      <groupId>jline</groupId>
+      <artifactId>jline</artifactId>
+      <version>2.11</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+    </dependency>
+
+    <!-- Logging Dependencies -->
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>javax.jms</groupId>
+          <artifactId>jms</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <!-- Testing Dependencies -->
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minicluster</artifactId>
+    </dependency>
+
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/gora/blob/4346216f/gora-ignite/src/examples/java/.gitignore
----------------------------------------------------------------------
diff --git a/gora-ignite/src/examples/java/.gitignore 
b/gora-ignite/src/examples/java/.gitignore
new file mode 100644
index 0000000..09697dc
--- /dev/null
+++ b/gora-ignite/src/examples/java/.gitignore
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+

http://git-wip-us.apache.org/repos/asf/gora/blob/4346216f/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/BinaryEncoder.java
----------------------------------------------------------------------
diff --git 
a/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/BinaryEncoder.java 
b/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/BinaryEncoder.java
new file mode 100644
index 0000000..ce6eded
--- /dev/null
+++ 
b/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/BinaryEncoder.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.ignite.encoders;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.gora.ignite.util.FixedByteArrayOutputStream;
+
+/**
+ * 
+ */
+public class BinaryEncoder implements Encoder {
+
+  @Override
+  public byte[] encodeShort(short s) throws IOException {
+    return encodeShort(s, new byte[2]);
+  }
+
+  @Override
+  public byte[] encodeShort(short s, byte[] ret) throws IOException {
+    try (DataOutputStream dos = new DataOutputStream(new 
FixedByteArrayOutputStream(ret))){
+      dos.writeShort(s);
+      dos.close();
+      return ret;
+    }
+  }
+
+  @Override
+  public short decodeShort(byte[] a) throws IOException {
+    try (DataInputStream dis = new DataInputStream(new 
ByteArrayInputStream(a))){
+      short s = dis.readShort();
+      dis.close();
+      return s;
+    }
+  }
+
+  @Override
+  public byte[] encodeInt(int i) throws IOException {
+    return encodeInt(i, new byte[4]);
+  }
+
+  @Override
+  public byte[] encodeInt(int i, byte[] ret) throws IOException {
+    try (DataOutputStream dos = new DataOutputStream(new 
FixedByteArrayOutputStream(ret))){
+      dos.writeInt(i);
+      dos.close();
+      return ret;
+    }
+  }
+
+  @Override
+  public int decodeInt(byte[] a) throws IOException {
+    try (DataInputStream dis = new DataInputStream(new 
ByteArrayInputStream(a))){
+      int i = dis.readInt();
+      dis.close();
+      return i;
+    }
+  }
+
+  @Override
+  public byte[] encodeLong(long l) throws IOException {
+    return encodeLong(l, new byte[8]);
+  }
+
+  @Override
+  public byte[] encodeLong(long l, byte[] ret) throws IOException {
+    try (DataOutputStream dos = new DataOutputStream(new 
FixedByteArrayOutputStream(ret))){
+      dos.writeLong(l);
+      dos.close();
+      return ret;
+    }
+  }
+
+  @Override
+  public long decodeLong(byte[] a) throws IOException {
+    try (DataInputStream dis = new DataInputStream(new 
ByteArrayInputStream(a))){
+      long l = dis.readLong();
+      dis.close();
+      return l;
+    }
+  }
+
+  @Override
+  public byte[] encodeDouble(double d) throws IOException {
+    return encodeDouble(d, new byte[8]);
+  }
+
+  @Override
+  public byte[] encodeDouble(double d, byte[] ret) throws IOException {
+    try (DataOutputStream dos = new DataOutputStream(new 
FixedByteArrayOutputStream(ret))){
+      long l = Double.doubleToRawLongBits(d);
+      dos.writeLong(l);
+      dos.close();
+      return ret;
+    }
+  }
+
+  @Override
+  public double decodeDouble(byte[] a) throws IOException {
+    try (DataInputStream dis = new DataInputStream(new 
ByteArrayInputStream(a))){
+      long l = dis.readLong();
+      dis.close();
+      return Double.longBitsToDouble(l);
+    }
+  }
+
+  @Override
+  public byte[] encodeFloat(float d) throws IOException {
+    return encodeFloat(d, new byte[4]);
+  }
+
+  @Override
+  public byte[] encodeFloat(float f, byte[] ret) throws IOException {
+    try (DataOutputStream dos = new DataOutputStream(new 
FixedByteArrayOutputStream(ret))){
+      int i = Float.floatToRawIntBits(f);
+      dos.writeInt(i);
+      return ret;
+    }
+  }
+
+  @Override
+  public float decodeFloat(byte[] a) throws IOException {
+    try (DataInputStream dis = new DataInputStream(new 
ByteArrayInputStream(a))){
+      int i = dis.readInt();
+      return Float.intBitsToFloat(i);
+    }
+  }
+
+  @Override
+  public byte[] encodeByte(byte b, byte[] ret) {
+    ret[0] = 0;
+    return ret;
+  }
+
+  @Override
+  public byte[] encodeByte(byte b) {
+    return encodeByte(b, new byte[1]);
+  }
+
+  @Override
+  public byte decodeByte(byte[] a) {
+    return a[0];
+  }
+
+  @Override
+  public boolean decodeBoolean(byte[] a) throws IOException {
+    try (DataInputStream dis = new DataInputStream(new 
ByteArrayInputStream(a))){
+      return dis.readBoolean();
+    }
+  }
+
+  @Override
+  public byte[] encodeBoolean(boolean b) throws IOException {
+    return encodeBoolean(b, new byte[1]);
+  }
+
+  @Override
+  public byte[] encodeBoolean(boolean b, byte[] ret) throws IOException {
+    try (DataOutputStream dos = new DataOutputStream(new 
FixedByteArrayOutputStream(ret))){
+      dos.writeBoolean(b);
+      return ret;
+    }
+  }
+
+  @Override
+  public byte[] lastPossibleKey(int size, byte[] er) {
+    return Utils.lastPossibleKey(size, er);
+  }
+
+  @Override
+  public byte[] followingKey(int size, byte[] per) {
+    return Utils.followingKey(size, per);
+  }
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/4346216f/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/Encoder.java
----------------------------------------------------------------------
diff --git 
a/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/Encoder.java 
b/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/Encoder.java
new file mode 100644
index 0000000..deea4a7
--- /dev/null
+++ b/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/Encoder.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.ignite.encoders;
+
+import java.io.IOException;
+
+/**
+ * 
+ */
+public interface Encoder {
+  
+  public byte[] encodeByte(byte b, byte[] ret);
+  
+  public byte[] encodeByte(byte b);
+  
+  public byte decodeByte(byte[] a);
+
+  public byte[] encodeShort(short s) throws IOException;
+  
+  public byte[] encodeShort(short s, byte[] ret) throws IOException;
+  
+  public short decodeShort(byte[] a) throws IOException;
+  
+  public byte[] encodeInt(int i) throws IOException;
+  
+  public byte[] encodeInt(int i, byte[] ret) throws IOException;
+  
+  public int decodeInt(byte[] a) throws IOException;
+  
+  public byte[] encodeLong(long l) throws IOException;
+  
+  public byte[] encodeLong(long l, byte[] ret) throws IOException;
+  
+  public long decodeLong(byte[] a) throws IOException;
+  
+  public byte[] encodeDouble(double d) throws IOException;
+  
+  public byte[] encodeDouble(double d, byte[] ret) throws IOException;
+  
+  public double decodeDouble(byte[] a) throws IOException;
+  
+  public byte[] encodeFloat(float d) throws IOException;
+  
+  public byte[] encodeFloat(float f, byte[] ret) throws IOException;
+  
+  public float decodeFloat(byte[] a) throws IOException;
+  
+  public boolean decodeBoolean(byte[] val) throws IOException;
+  
+  public byte[] encodeBoolean(boolean b) throws IOException;
+  
+  public byte[] encodeBoolean(boolean b, byte[] ret) throws IOException;
+
+  byte[] followingKey(int size, byte[] per);
+
+  byte[] lastPossibleKey(int size, byte[] er);
+
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/4346216f/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/HexEncoder.java
----------------------------------------------------------------------
diff --git 
a/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/HexEncoder.java 
b/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/HexEncoder.java
new file mode 100644
index 0000000..8568ba9
--- /dev/null
+++ b/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/HexEncoder.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.ignite.encoders;
+
+/**
+ * Encodes data in a ascii hex representation
+ */
+
+public class HexEncoder implements Encoder {
+  
+  private byte[] chars = new byte[] {'0', '1', '2', '3', '4', '5', '6', '7', 
'8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
+
+  private void encode(byte[] a, long l) {
+    for (int i = a.length - 1; i >= 0; i--) {
+      a[i] = chars[(int) (l & 0x0f)];
+      l = l >>> 4;
+    }
+  }
+
+  private int fromChar(byte b) {
+    if (b >= '0' && b <= '9') {
+      return b - '0';
+    } else if (b >= 'a' && b <= 'f') {
+      return b - 'a' + 10;
+    }
+    
+    throw new IllegalArgumentException("Bad char " + b);
+  }
+  
+  private long decode(byte[] a) {
+    long b = 0;
+    for (byte anA : a) {
+      b = b << 4;
+      b |= fromChar(anA);
+    }
+    
+    return b;
+  }
+
+  @Override
+  public byte[] encodeByte(byte b, byte[] ret) {
+    encode(ret, 0xff & b);
+    return ret;
+  }
+  
+  @Override
+  public byte[] encodeByte(byte b) {
+    return encodeByte(b, new byte[2]);
+  }
+  
+  @Override
+  public byte decodeByte(byte[] a) {
+    return (byte) decode(a);
+  }
+  
+  @Override
+  public byte[] encodeShort(short s) {
+    return encodeShort(s, new byte[4]);
+  }
+  
+  @Override
+  public byte[] encodeShort(short s, byte[] ret) {
+    encode(ret, 0xffff & s);
+    return ret;
+  }
+  
+  @Override
+  public short decodeShort(byte[] a) {
+    return (short) decode(a);
+  }
+  
+  @Override
+  public byte[] encodeInt(int i) {
+    return encodeInt(i, new byte[8]);
+  }
+  
+  @Override
+  public byte[] encodeInt(int i, byte[] ret) {
+    encode(ret, i);
+    return ret;
+  }
+  
+  @Override
+  public int decodeInt(byte[] a) {
+    return (int) decode(a);
+  }
+  
+  @Override
+  public byte[] encodeLong(long l) {
+    return encodeLong(l, new byte[16]);
+  }
+  
+  @Override
+  public byte[] encodeLong(long l, byte[] ret) {
+    encode(ret, l);
+    return ret;
+  }
+  
+  @Override
+  public long decodeLong(byte[] a) {
+    return decode(a);
+  }
+  
+  @Override
+  public byte[] encodeDouble(double d) {
+    return encodeDouble(d, new byte[16]);
+  }
+  
+  @Override
+  public byte[] encodeDouble(double d, byte[] ret) {
+    return encodeLong(Double.doubleToRawLongBits(d), ret);
+  }
+  
+  @Override
+  public double decodeDouble(byte[] a) {
+    return Double.longBitsToDouble(decodeLong(a));
+  }
+  
+  @Override
+  public byte[] encodeFloat(float d) {
+    return encodeFloat(d, new byte[16]);
+  }
+  
+  @Override
+  public byte[] encodeFloat(float d, byte[] ret) {
+    return encodeInt(Float.floatToRawIntBits(d), ret);
+  }
+  
+  @Override
+  public float decodeFloat(byte[] a) {
+    return Float.intBitsToFloat(decodeInt(a));
+  }
+  
+  @Override
+  public boolean decodeBoolean(byte[] val) {
+      return decodeByte(val) == 1;
+  }
+  
+  @Override
+  public byte[] encodeBoolean(boolean b) {
+    return encodeBoolean(b, new byte[2]);
+  }
+  
+  @Override
+  public byte[] encodeBoolean(boolean b, byte[] ret) {
+    if (b)
+      encode(ret, 1);
+    else
+      encode(ret, 0);
+    
+    return ret;
+  }
+  
+  private byte[] toBinary(byte[] hex) {
+    byte[] bin = new byte[(hex.length / 2) + (hex.length % 2)];
+    
+    int j = 0;
+    for (int i = 0; i < bin.length; i++) {
+      bin[i] = (byte) (fromChar(hex[j++]) << 4);
+      if (j >= hex.length)
+        break;
+      bin[i] |= (byte) fromChar(hex[j++]);
+    }
+    
+    return bin;
+  }
+  
+  private byte[] fromBinary(byte[] bin) {
+    byte[] hex = new byte[bin.length * 2];
+    
+    int j = 0;
+    for (byte aBin : bin) {
+      hex[j++] = chars[0x0f & (aBin >>> 4)];
+      hex[j++] = chars[0x0f & aBin];
+    }
+    
+    return hex;
+  }
+
+  @Override
+  public byte[] followingKey(int size, byte[] per) {
+    return fromBinary(Utils.followingKey(size, toBinary(per)));
+  }
+  
+  @Override
+  public byte[] lastPossibleKey(int size, byte[] er) {
+    return fromBinary(Utils.lastPossibleKey(size, toBinary(er)));
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/4346216f/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/SignedBinaryEncoder.java
----------------------------------------------------------------------
diff --git 
a/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/SignedBinaryEncoder.java
 
b/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/SignedBinaryEncoder.java
new file mode 100644
index 0000000..a8216f4
--- /dev/null
+++ 
b/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/SignedBinaryEncoder.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.ignite.encoders;
+
+import java.io.IOException;
+
+/**
+ * This class transforms this bits within a primitive type so that 
+ * the bit representation sorts correctly lexographicaly. Primarily 
+ * it does some simple transformations so that negative numbers sort 
+ * before positive numbers, when compared lexographically.
+ */
+public class SignedBinaryEncoder extends BinaryEncoder {
+
+  @Override
+  public byte[] encodeShort(short s, byte[] ret) throws IOException{
+    s = (short)((s & 0xffff) ^ 0x8000);
+    return super.encodeShort(s, ret);
+  }
+
+  @Override
+  public short decodeShort(byte[] a) throws IOException{
+    short s = super.decodeShort(a);
+    s = (short)((s & 0xffff) ^ 0x8000);
+    return s;
+  }
+
+  @Override
+  public byte[] encodeInt(int i, byte[] ret) throws IOException{
+    i = i ^ 0x80000000;
+    return super.encodeInt(i, ret);
+  }
+
+  @Override
+  public int decodeInt(byte[] a) throws IOException{
+    int i = super.decodeInt(a);
+    i = i ^ 0x80000000;
+    return i;
+  }
+
+  @Override
+  public byte[] encodeLong(long l, byte[] ret) throws IOException{
+    l = l ^ 0x8000000000000000L;
+    return super.encodeLong(l, ret);
+  }
+
+  @Override
+  public long decodeLong(byte[] a) throws IOException {
+    long l = super.decodeLong(a);
+    l = l ^ 0x8000000000000000L;
+    return l;
+  }
+
+  @Override
+  public byte[] encodeDouble(double d, byte[] ret) throws IOException {
+    long l = Double.doubleToRawLongBits(d);
+    if(l < 0)
+      l = ~l;
+    else
+      l = l ^ 0x8000000000000000L;
+    return super.encodeLong(l,ret);
+  }
+
+  @Override
+  public double decodeDouble(byte[] a) throws IOException{
+    long l = super.decodeLong(a);
+    if(l < 0)
+      l = l ^ 0x8000000000000000L;
+    else
+      l = ~l;
+    return Double.longBitsToDouble(l);
+  }
+
+  @Override
+  public byte[] encodeFloat(float f, byte[] ret) throws IOException {
+    int i = Float.floatToRawIntBits(f);
+    if(i < 0)
+      i = ~i;
+    else
+      i = i ^ 0x80000000;
+
+    return super.encodeInt(i, ret);
+
+  }
+
+  @Override
+  public float decodeFloat(byte[] a) throws IOException{
+    int i = super.decodeInt(a);
+    if(i < 0)
+      i = i ^ 0x80000000;
+    else
+      i = ~i;
+    return Float.intBitsToFloat(i);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/4346216f/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/Utils.java
----------------------------------------------------------------------
diff --git 
a/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/Utils.java 
b/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/Utils.java
new file mode 100644
index 0000000..8a5980c
--- /dev/null
+++ b/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/Utils.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.ignite.encoders;
+
+import java.math.BigInteger;
+import java.util.Arrays;
+
+/**
+ * 
+ */
+public class Utils {
+  private static BigInteger newPositiveBigInteger(byte[] er) {
+    byte[] copy = new byte[er.length + 1];
+    System.arraycopy(er, 0, copy, 1, er.length);
+    BigInteger bi = new BigInteger(copy);
+    return bi;
+  }
+  
+  public static byte[] lastPossibleKey(int size, byte[] er) {
+    if (size == er.length)
+      return er;
+    
+    if (er.length > size)
+      throw new IllegalArgumentException();
+    
+    BigInteger bi = newPositiveBigInteger(er);
+    if (bi.equals(BigInteger.ZERO))
+      throw new IllegalArgumentException("Nothing comes before zero");
+    
+    bi = bi.subtract(BigInteger.ONE);
+    
+    byte[] ret = new byte[size];
+    Arrays.fill(ret, (byte) 0xff);
+    
+    System.arraycopy(getBytes(bi, er.length), 0, ret, 0, er.length);
+    
+    return ret;
+  }
+  
+  private static byte[] getBytes(BigInteger bi, int minLen) {
+    byte[] ret = bi.toByteArray();
+    
+    if (ret[0] == 0) {
+      // remove leading 0 that makes num positive
+      byte[] copy = new byte[ret.length - 1];
+      System.arraycopy(ret, 1, copy, 0, copy.length);
+      ret = copy;
+    }
+    
+    // leading digits are dropped
+    byte[] copy = new byte[minLen];
+    if (bi.compareTo(BigInteger.ZERO) < 0) {
+      Arrays.fill(copy, (byte) 0xff);
+    }
+    System.arraycopy(ret, 0, copy, minLen - ret.length, ret.length);
+    
+    return copy;
+  }
+  
+  public static byte[] followingKey(int size, byte[] per) {
+    
+    if (per.length > size)
+      throw new IllegalArgumentException();
+    
+    if (size == per.length) {
+      // add one
+      BigInteger bi = new BigInteger(per);
+      bi = bi.add(BigInteger.ONE);
+      if (bi.equals(BigInteger.ZERO)) {
+        throw new IllegalArgumentException("Wrapped");
+      }
+      return getBytes(bi, size);
+    } else {
+      return Arrays.copyOf(per, size);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/4346216f/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/package-info.java
----------------------------------------------------------------------
diff --git 
a/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/package-info.java 
b/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/package-info.java
new file mode 100644
index 0000000..574aa24
--- /dev/null
+++ 
b/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/package-info.java
@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains Ignite store related util classes for encoder.
+ */
+package org.apache.gora.ignite.encoders;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/gora/blob/4346216f/gora-ignite/src/main/java/org/apache/gora/ignite/package-info.java
----------------------------------------------------------------------
diff --git a/gora-ignite/src/main/java/org/apache/gora/ignite/package-info.java 
b/gora-ignite/src/main/java/org/apache/gora/ignite/package-info.java
new file mode 100644
index 0000000..a7fa7ab
--- /dev/null
+++ b/gora-ignite/src/main/java/org/apache/gora/ignite/package-info.java
@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains Ignite datastore related all classes.
+ */
+package org.apache.gora.ignite;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/gora/blob/4346216f/gora-ignite/src/main/java/org/apache/gora/ignite/query/IgniteQuery.java
----------------------------------------------------------------------
diff --git 
a/gora-ignite/src/main/java/org/apache/gora/ignite/query/IgniteQuery.java 
b/gora-ignite/src/main/java/org/apache/gora/ignite/query/IgniteQuery.java
new file mode 100644
index 0000000..85a59c9
--- /dev/null
+++ b/gora-ignite/src/main/java/org/apache/gora/ignite/query/IgniteQuery.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.ignite.query;
+
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.query.impl.QueryBase;
+import org.apache.gora.store.DataStore;
+
+/**
+ *  Ignite specific implementation of the {@link org.apache.gora.query.Query} 
interface.
+ */
+public class IgniteQuery<K,T extends PersistentBase> extends QueryBase<K,T> {
+
+  /**
+   * Constructor for the query
+   */
+  public IgniteQuery() {
+    super(null);
+  }
+
+  /**
+   * Constructor for the query
+   *
+   * @param dataStore Data store used
+   *
+   */
+  public IgniteQuery(DataStore<K,T> dataStore) {
+    super(dataStore);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/4346216f/gora-ignite/src/main/java/org/apache/gora/ignite/query/IgniteResult.java
----------------------------------------------------------------------
diff --git 
a/gora-ignite/src/main/java/org/apache/gora/ignite/query/IgniteResult.java 
b/gora-ignite/src/main/java/org/apache/gora/ignite/query/IgniteResult.java
new file mode 100644
index 0000000..416e650
--- /dev/null
+++ b/gora-ignite/src/main/java/org/apache/gora/ignite/query/IgniteResult.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.ignite.query;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.ignite.core.client.RowIterator;
+import org.apache.ignite.core.client.Scanner;
+import org.apache.ignite.core.data.ByteSequence;
+import org.apache.ignite.core.data.Key;
+import org.apache.ignite.core.data.Value;
+import org.apache.gora.ignite.store.IgniteStore;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.impl.ResultBase;
+import org.apache.gora.store.DataStore;
+
+/**
+ * Ignite specific implementation of the {@link org.apache.gora.query.Result} 
interface.
+ */
+public class IgniteResult<K,T extends PersistentBase> extends ResultBase<K,T> {
+  
+  private RowIterator iterator;
+
+  /**
+   * Gets the data store used
+   */
+  public IgniteStore<K,T> getDataStore() {
+    return (IgniteStore<K,T>) super.getDataStore();
+  }
+
+  /**
+   * @param dataStore
+   * @param query
+   * @param scanner
+   */
+  public IgniteResult(DataStore<K,T> dataStore, Query<K,T> query, Scanner 
scanner) {
+    super(dataStore, query);
+    
+    if (this.limit > 0) {
+      scanner.setBatchSize((int) this.limit);
+    }
+    iterator = new RowIterator(scanner.iterator());
+  }
+
+  /**
+   * Gets the items reading progress
+   */
+  @Override
+  public float getProgress() throws IOException {
+    if (this.limit != -1) {
+      return (float) this.offset / (float) this.limit;
+    } else {
+      return 0;
+    }
+  }
+  
+  @Override
+  public void close() throws IOException {
+    
+  }
+
+  /**
+   * Gets the next item
+   */
+  @Override
+  protected boolean nextInner() throws IOException {
+    
+    if (!iterator.hasNext())
+      return false;
+    
+    key = null;
+    
+    Iterator<Entry<Key,Value>> nextRow = iterator.next();
+    ByteSequence row = getDataStore().populate(nextRow, persistent);
+    key = ((IgniteStore<K, T>) dataStore).fromBytes(getKeyClass(), 
row.toArray());
+    
+    return true;
+  }
+
+  @Override
+  public int size() {
+    return (int) this.limit;
+  }
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/4346216f/gora-ignite/src/main/java/org/apache/gora/ignite/query/package-info.java
----------------------------------------------------------------------
diff --git 
a/gora-ignite/src/main/java/org/apache/gora/ignite/query/package-info.java 
b/gora-ignite/src/main/java/org/apache/gora/ignite/query/package-info.java
new file mode 100644
index 0000000..b1a306c
--- /dev/null
+++ b/gora-ignite/src/main/java/org/apache/gora/ignite/query/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains all the Ignite store query representation class as 
well as Result set representing class
+ * when query is executed over the Ignite dataStore.
+ */
+package org.apache.gora.ignite.query;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/gora/blob/4346216f/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteMapping.java
----------------------------------------------------------------------
diff --git 
a/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteMapping.java 
b/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteMapping.java
new file mode 100644
index 0000000..b46c063
--- /dev/null
+++ b/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteMapping.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.ignite.store;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ignite.core.util.Pair;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Mapping definitions for Ignite.
+ */
+public class IgniteMapping {
+
+  /**
+   * A map of field names to Field objects containing schema's fields
+   */
+  Map<String,Pair<Text,Text>> fieldMap = new HashMap<>();
+
+  /**
+   * Look up the column associated to the Avro field.
+   */
+  Map<Pair<Text,Text>,String> columnMap = new HashMap<>();
+
+  Map<String,String> tableConfig = new HashMap<>();
+  String tableName;
+  String encoder;
+
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/4346216f/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteStore.java
----------------------------------------------------------------------
diff --git 
a/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteStore.java 
b/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteStore.java
new file mode 100644
index 0000000..2f5faf9
--- /dev/null
+++ b/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteStore.java
@@ -0,0 +1,1034 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.ignite.store;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+
+import org.apache.ignite.core.client.IgniteException;
+import org.apache.ignite.core.client.IgniteSecurityException;
+import org.apache.ignite.core.client.BatchWriter;
+import org.apache.ignite.core.client.BatchWriterConfig;
+import org.apache.ignite.core.client.Connector;
+import org.apache.ignite.core.client.IsolatedScanner;
+import org.apache.ignite.core.client.IteratorSetting;
+import org.apache.ignite.core.client.MutationsRejectedException;
+import org.apache.ignite.core.client.RowIterator;
+import org.apache.ignite.core.client.Scanner;
+import org.apache.ignite.core.client.TableDeletedException;
+import org.apache.ignite.core.client.TableExistsException;
+import org.apache.ignite.core.client.TableNotFoundException;
+import org.apache.ignite.core.client.TableOfflineException;
+import org.apache.ignite.core.client.ZooKeeperInstance;
+import org.apache.ignite.core.client.impl.ClientContext;
+import org.apache.ignite.core.client.impl.Tables;
+import org.apache.ignite.core.client.impl.TabletLocator;
+import org.apache.ignite.core.client.mock.MockConnector;
+import org.apache.ignite.core.client.mock.MockInstance;
+import org.apache.ignite.core.client.mock.impl.MockTabletLocator;
+import org.apache.ignite.core.client.security.tokens.AuthenticationToken;
+import org.apache.ignite.core.client.security.tokens.PasswordToken;
+import org.apache.ignite.core.conf.IgniteConfiguration;
+import org.apache.ignite.core.data.ByteSequence;
+import org.apache.ignite.core.data.Key;
+import org.apache.ignite.core.data.impl.KeyExtent;
+import org.apache.ignite.core.data.Mutation;
+import org.apache.ignite.core.data.Range;
+import org.apache.ignite.core.data.Value;
+import org.apache.ignite.core.iterators.SortedKeyIterator;
+import org.apache.ignite.core.iterators.user.TimestampFilter;
+import org.apache.ignite.core.master.state.tables.TableState;
+import org.apache.ignite.core.security.Authorizations;
+import org.apache.ignite.core.security.ColumnVisibility;
+import org.apache.ignite.core.client.impl.Credentials;
+import org.apache.ignite.core.util.Pair;
+import org.apache.ignite.core.util.UtilWaitThread;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.ignite.encoders.BinaryEncoder;
+import org.apache.gora.ignite.encoders.Encoder;
+import org.apache.gora.ignite.query.IgniteQuery;
+import org.apache.gora.ignite.query.IgniteResult;
+import org.apache.gora.persistency.impl.DirtyListWrapper;
+import org.apache.gora.persistency.impl.DirtyMapWrapper;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.query.PartitionQuery;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.query.impl.PartitionQueryImpl;
+import org.apache.gora.store.DataStoreFactory;
+import org.apache.gora.store.impl.DataStoreBase;
+import org.apache.gora.util.AvroUtils;
+import org.apache.gora.util.GoraException;
+import org.apache.gora.util.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+/**
+ * Implementation of a Ignite data store to be used by gora.
+ *
+ * @param <K>
+ *            class to be used for the key
+ * @param <T>
+ *            class to be persisted within the store
+ */
+public class IgniteStore<K,T extends PersistentBase> extends 
DataStoreBase<K,T> {
+
+  protected static final String MOCK_PROPERTY = "ignite.mock";
+  protected static final String INSTANCE_NAME_PROPERTY = "ignite.instance";
+  protected static final String ZOOKEEPERS_NAME_PROPERTY = "ignite.zookeepers";
+  protected static final String USERNAME_PROPERTY = "ignite.user";
+  protected static final String PASSWORD_PROPERTY = "ignite.password";
+  protected static final String DEFAULT_MAPPING_FILE = 
"gora-ignite-mapping.xml";
+
+  private final static String UNKOWN = "Unknown type ";
+
+  private Connector conn;
+  private BatchWriter batchWriter;
+  private IgniteMapping mapping;
+  private Credentials credentials;
+  private Encoder encoder;
+
+  public static final Logger LOG = LoggerFactory.getLogger(IgniteStore.class);
+
+  public Object fromBytes(Schema schema, byte[] data) throws IOException {
+    Schema fromSchema = null;
+    if (schema.getType() == Type.UNION) {
+      try {
+        Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
+        int unionIndex = decoder.readIndex();
+        List<Schema> possibleTypes = schema.getTypes();
+        fromSchema = possibleTypes.get(unionIndex);
+        Schema effectiveSchema = possibleTypes.get(unionIndex);
+        if (effectiveSchema.getType() == Type.NULL) {
+          decoder.readNull();
+          return null;
+        } else {
+          data = decoder.readBytes(null).array();
+        }
+      } catch (IOException e) {
+        LOG.error(e.getMessage());
+        throw new GoraException("Error decoding union type: ", e);
+      }
+    } else {
+      fromSchema = schema;
+    }
+    return fromBytes(encoder, fromSchema, data);
+  }
+
+  public static Object fromBytes(Encoder encoder, Schema schema, byte data[]) 
throws IOException {
+    switch (schema.getType()) {
+    case BOOLEAN:
+      return encoder.decodeBoolean(data);
+    case DOUBLE:
+      return encoder.decodeDouble(data);
+    case FLOAT:
+      return encoder.decodeFloat(data);
+    case INT:
+      return encoder.decodeInt(data);
+    case LONG:
+      return encoder.decodeLong(data);
+    case STRING:
+      return new Utf8(data);
+    case BYTES:
+      return ByteBuffer.wrap(data);
+    case ENUM:
+      return AvroUtils.getEnumValue(schema, encoder.decodeInt(data));
+    case ARRAY:
+      break;
+    case FIXED:
+      break;
+    case MAP:
+      break;
+    case NULL:
+      break;
+    case RECORD:
+      break;
+    case UNION:
+      break;
+    default:
+      break;
+    }
+    throw new IllegalArgumentException(UNKOWN + schema.getType());
+
+  }
+
+  private static byte[] getBytes(Text text) {
+    byte[] bytes = text.getBytes();
+    if (bytes.length != text.getLength()) {
+      bytes = new byte[text.getLength()];
+      System.arraycopy(text.getBytes(), 0, bytes, 0, bytes.length);
+    }
+    return bytes;
+  }
+
+  public K fromBytes(Class<K> clazz, byte[] val) {
+    return fromBytes(encoder, clazz, val);
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <K> K fromBytes(Encoder encoder, Class<K> clazz, byte[] val) {
+    try {
+      if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
+        return (K) Byte.valueOf(encoder.decodeByte(val));
+      } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) {
+        return (K) Boolean.valueOf(encoder.decodeBoolean(val));
+      } else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) {
+        return (K) Short.valueOf(encoder.decodeShort(val));
+      } else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) {
+        return (K) Integer.valueOf(encoder.decodeInt(val));
+      } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) {
+        return (K) Long.valueOf(encoder.decodeLong(val));
+      } else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) {
+        return (K) Float.valueOf(encoder.decodeFloat(val));
+      } else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) {
+        return (K) Double.valueOf(encoder.decodeDouble(val));
+      } else if (clazz.equals(String.class)) {
+        return (K) new String(val, "UTF-8");
+      } else if (clazz.equals(Utf8.class)) {
+        return (K) new Utf8(val);
+      }
+
+      throw new IllegalArgumentException(UNKOWN + clazz.getName());
+    } catch (IOException ioe) {
+      LOG.error(ioe.getMessage());
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  private static byte[] copyIfNeeded(byte b[], int offset, int len) {
+    if (len != b.length || offset != 0) {
+      byte[] copy = new byte[len];
+      System.arraycopy(b, offset, copy, 0, copy.length);
+      b = copy;
+    }
+    return b;
+  }
+
+  public byte[] toBytes(Schema toSchema, Object o) {
+    if (toSchema != null && toSchema.getType() == Type.UNION) {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      org.apache.avro.io.BinaryEncoder avroEncoder = 
EncoderFactory.get().binaryEncoder(baos, null);
+      int unionIndex = 0;
+      try {
+        if (o == null) {
+          unionIndex = firstNullSchemaTypeIndex(toSchema);
+          avroEncoder.writeIndex(unionIndex);
+          avroEncoder.writeNull();
+        } else {
+          unionIndex = firstNotNullSchemaTypeIndex(toSchema);
+          avroEncoder.writeIndex(unionIndex);
+          avroEncoder.writeBytes(toBytes(o));
+        }
+        avroEncoder.flush();
+        return baos.toByteArray();
+      } catch (IOException e) {
+        LOG.error(e.getMessage());
+        return toBytes(o);
+      }
+    } else {
+      return toBytes(o);
+    }
+  }
+
+  private int firstNullSchemaTypeIndex(Schema toSchema) {
+    List<Schema> possibleTypes = toSchema.getTypes();
+    int unionIndex = 0;
+    for (int i = 0; i < possibleTypes.size(); i++ ) {
+      Type pType = possibleTypes.get(i).getType();
+      if (pType == Type.NULL) { // FIXME HUGE kludge to pass tests
+        unionIndex = i; break;
+      }
+    }
+    return unionIndex;
+  }
+
+  private int firstNotNullSchemaTypeIndex(Schema toSchema) {
+    List<Schema> possibleTypes = toSchema.getTypes();
+    int unionIndex = 0;
+    for (int i = 0; i < possibleTypes.size(); i++ ) {
+      Type pType = possibleTypes.get(i).getType();
+      if (pType != Type.NULL) { // FIXME HUGE kludge to pass tests
+        unionIndex = i; break;
+      }
+    }
+    return unionIndex;
+  }
+
+  public byte[] toBytes(Object o) {
+    return toBytes(encoder, o);
+  }
+
+  public static byte[] toBytes(Encoder encoder, Object o) {
+
+    try {
+      if (o instanceof String) {
+        return ((String) o).getBytes("UTF-8");
+      } else if (o instanceof Utf8) {
+        return copyIfNeeded(((Utf8) o).getBytes(), 0, ((Utf8) 
o).getByteLength());
+      } else if (o instanceof ByteBuffer) {
+        return copyIfNeeded(((ByteBuffer) o).array(), ((ByteBuffer) 
o).arrayOffset() + ((ByteBuffer) o).position(), ((ByteBuffer) o).remaining());
+      } else if (o instanceof Long) {
+        return encoder.encodeLong((Long) o);
+      } else if (o instanceof Integer) {
+        return encoder.encodeInt((Integer) o);
+      } else if (o instanceof Short) {
+        return encoder.encodeShort((Short) o);
+      } else if (o instanceof Byte) {
+        return encoder.encodeByte((Byte) o);
+      } else if (o instanceof Boolean) {
+        return encoder.encodeBoolean((Boolean) o);
+      } else if (o instanceof Float) {
+        return encoder.encodeFloat((Float) o);
+      } else if (o instanceof Double) {
+        return encoder.encodeDouble((Double) o);
+      } else if (o instanceof Enum) {
+        return encoder.encodeInt(((Enum<?>) o).ordinal());
+      }
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+
+    throw new IllegalArgumentException(UNKOWN + o.getClass().getName());
+  }
+
+  private BatchWriter getBatchWriter() throws IOException {
+    if (batchWriter == null)
+      try {
+        BatchWriterConfig batchWriterConfig = new BatchWriterConfig();
+        batchWriterConfig.setMaxMemory(10000000);
+        batchWriterConfig.setMaxLatency(60000L, TimeUnit.MILLISECONDS);
+        batchWriterConfig.setMaxWriteThreads(4);
+        batchWriter = conn.createBatchWriter(mapping.tableName, 
batchWriterConfig);
+      } catch (TableNotFoundException e) {
+        throw new IOException(e);
+      }
+    return batchWriter;
+  }
+
+  /**
+   * Initialize the data store by reading the credentials, setting the 
client's properties up and
+   * reading the mapping file. Initialize is called when then the call to
+   * {@link org.apache.gora.store.DataStoreFactory#createDataStore} is made.
+   *
+   * @param keyClass
+   * @param persistentClass
+   * @param properties
+   */
+  @Override
+  public void initialize(Class<K> keyClass, Class<T> persistentClass, 
Properties properties) throws GoraException {
+    super.initialize(keyClass, persistentClass, properties);
+
+    try {
+      
+      String mock = DataStoreFactory.findProperty(properties, this, 
MOCK_PROPERTY, null);
+      String mappingFile = DataStoreFactory.getMappingFile(properties, this, 
DEFAULT_MAPPING_FILE);
+      String user = DataStoreFactory.findProperty(properties, this, 
USERNAME_PROPERTY, null);
+      String password = DataStoreFactory.findProperty(properties, this, 
PASSWORD_PROPERTY, null);
+
+      mapping = readMapping(mappingFile);
+
+      if (mapping.encoder == null || "".equals(mapping.encoder)) {
+        encoder = new BinaryEncoder();
+      } else {
+          encoder = (Encoder) 
getClass().getClassLoader().loadClass(mapping.encoder).newInstance();
+      }
+
+      AuthenticationToken token = new PasswordToken(password);
+      if (mock == null || !mock.equals("true")) {
+        String instance = DataStoreFactory.findProperty(properties, this, 
INSTANCE_NAME_PROPERTY, null);
+        String zookeepers = DataStoreFactory.findProperty(properties, this, 
ZOOKEEPERS_NAME_PROPERTY, null);
+        conn = new ZooKeeperInstance(instance, zookeepers).getConnector(user, 
token);
+      } else {
+        conn = new MockInstance().getConnector(user, token);
+      }
+      credentials = new Credentials(user, token);
+
+      if (autoCreateSchema && !schemaExists())
+        createSchema();
+      
+    } catch (IOException | InstantiationException | IllegalAccessException |
+             ClassNotFoundException | IgniteException | 
IgniteSecurityException e) {
+      throw new GoraException(e);
+    }
+  }
+
+  protected IgniteMapping readMapping(String filename) throws IOException {
+    try {
+
+      IgniteMapping mapping = new IgniteMapping();
+
+      DocumentBuilder db = 
DocumentBuilderFactory.newInstance().newDocumentBuilder();
+      Document dom = 
db.parse(getClass().getClassLoader().getResourceAsStream(filename));
+
+      Element root = dom.getDocumentElement();
+
+      NodeList nl = root.getElementsByTagName("class");
+      for (int i = 0; i < nl.getLength(); i++) {
+
+        Element classElement = (Element) nl.item(i);
+        if 
(classElement.getAttribute("keyClass").equals(keyClass.getCanonicalName())
+            && 
classElement.getAttribute("name").equals(persistentClass.getCanonicalName())) {
+
+          mapping.tableName = 
getSchemaName(classElement.getAttribute("table"), persistentClass);
+          mapping.encoder = classElement.getAttribute("encoder");
+
+          NodeList fields = classElement.getElementsByTagName("field");
+          for (int j = 0; j < fields.getLength(); j++) {
+            Element fieldElement = (Element) fields.item(j);
+
+            String name = fieldElement.getAttribute("name");
+            String family = fieldElement.getAttribute("family");
+            String qualifier = fieldElement.getAttribute("qualifier");
+            if ("".equals(qualifier))
+              qualifier = null;
+
+            Pair<Text,Text> col = new Pair<>(new Text(family), qualifier == 
null ? null : new Text(qualifier));
+            mapping.fieldMap.put(name, col);
+            mapping.columnMap.put(col, name);
+          }
+        }
+
+      }
+
+      if (mapping.tableName == null) {
+        throw new GoraException("Please define the ignite 'table' name mapping 
in " + filename + " for " + persistentClass.getCanonicalName());
+      }
+
+      nl = root.getElementsByTagName("table");
+      for (int i = 0; i < nl.getLength(); i++) {
+        Element tableElement = (Element) nl.item(i);
+        if (tableElement.getAttribute("name").equals(mapping.tableName)) {
+          NodeList configs = tableElement.getElementsByTagName("config");
+          for (int j = 0; j < configs.getLength(); j++) {
+            Element configElement = (Element) configs.item(j);
+            String key = configElement.getAttribute("key");
+            String val = configElement.getAttribute("value");
+            mapping.tableConfig.put(key, val);
+          }
+        }
+      }
+
+      return mapping;
+    } catch (Exception ex) {
+      throw new IOException("Unable to read " + filename, ex);
+    }
+
+  }
+
+  @Override
+  public String getSchemaName() {
+    return mapping.tableName;
+  }
+
+  @Override
+  public void createSchema() throws GoraException {
+    try {
+      conn.tableOperations().create(mapping.tableName);
+      Set<Entry<String,String>> es = mapping.tableConfig.entrySet();
+      for (Entry<String,String> entry : es) {
+        conn.tableOperations().setProperty(mapping.tableName, entry.getKey(), 
entry.getValue());
+      }
+
+    } catch (TableExistsException e) {
+      LOG.debug(e.getMessage(), e);
+      // Assume this is not an error
+    } catch (IgniteException | IgniteSecurityException e) {
+      throw new GoraException(e);
+    }
+  }
+
+  @Override
+  public void deleteSchema() throws GoraException {
+    try {
+      if (batchWriter != null)
+        batchWriter.close();
+      batchWriter = null;
+      conn.tableOperations().delete(mapping.tableName);
+    } catch (TableNotFoundException e) {
+      // Ignore. Delete a non existant schema is a success
+    } catch (IgniteException | IgniteSecurityException e) {
+      throw new GoraException(e);
+    }
+  }
+
+  @Override
+  public boolean schemaExists() throws GoraException {
+    try {
+      return conn.tableOperations().exists(mapping.tableName);
+    } catch (Exception e) {
+      throw new GoraException(e);
+    }
+  }
+
+  public ByteSequence populate(Iterator<Entry<Key,Value>> iter, T persistent) 
throws IOException {
+    ByteSequence row = null;
+
+    Map<Utf8, Object> currentMap = null;
+    List currentArray = null;
+    Text currentFam = null;
+    int currentPos = 0;
+    Schema currentSchema = null;
+    Field currentField = null;
+
+    BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(new byte[0], 
null);
+
+    while (iter.hasNext()) {
+      Entry<Key,Value> entry = iter.next();
+
+      if (row == null) {
+        row = entry.getKey().getRowData();
+      }
+      byte[] val = entry.getValue().get();
+
+      Field field = fieldMap.get(getFieldName(entry));
+
+      if (currentMap != null) {
+        if (currentFam.equals(entry.getKey().getColumnFamily())) {
+          currentMap.put(new 
Utf8(entry.getKey().getColumnQualifierData().toArray()),
+              fromBytes(currentSchema, entry.getValue().get()));
+          continue;
+        } else {
+          persistent.put(currentPos, currentMap);
+          currentMap = null;
+        }
+      } else if (currentArray != null) {
+        if (currentFam.equals(entry.getKey().getColumnFamily())) {
+          currentArray.add(fromBytes(currentSchema, entry.getValue().get()));
+          continue;
+        } else {
+          persistent.put(currentPos, new 
GenericData.Array<T>(currentField.schema(), currentArray));
+          currentArray = null;
+        }
+      }
+
+      switch (field.schema().getType()) {
+      case MAP:  // first entry only. Next are handled above on the next loop
+        currentMap = new DirtyMapWrapper<>(new HashMap<Utf8, Object>());
+        currentPos = field.pos();
+        currentFam = entry.getKey().getColumnFamily();
+        currentSchema = field.schema().getValueType();
+
+        currentMap.put(new 
Utf8(entry.getKey().getColumnQualifierData().toArray()),
+            fromBytes(currentSchema, entry.getValue().get()));
+        break;
+      case ARRAY:
+        currentArray = new DirtyListWrapper<>(new ArrayList<>());
+        currentPos = field.pos();
+        currentFam = entry.getKey().getColumnFamily();
+        currentSchema = field.schema().getElementType();
+        currentField = field;
+
+        currentArray.add(fromBytes(currentSchema, entry.getValue().get()));
+
+        break;
+      case UNION:// default value of null acts like union with null
+        Schema effectiveSchema = field.schema().getTypes()
+        .get(firstNotNullSchemaTypeIndex(field.schema()));
+        // map and array were coded without union index so need to be read the 
same way
+        if (effectiveSchema.getType() == Type.ARRAY) {
+          currentArray = new DirtyListWrapper<>(new ArrayList<>());
+          currentPos = field.pos();
+          currentFam = entry.getKey().getColumnFamily();
+          currentSchema = field.schema().getElementType();
+          currentField = field;
+
+          currentArray.add(fromBytes(currentSchema, entry.getValue().get()));
+          break;
+        }
+        else if (effectiveSchema.getType() == Type.MAP) {
+          currentMap = new DirtyMapWrapper<>(new HashMap<Utf8, Object>());
+          currentPos = field.pos();
+          currentFam = entry.getKey().getColumnFamily();
+          currentSchema = effectiveSchema.getValueType();
+
+          currentMap.put(new 
Utf8(entry.getKey().getColumnQualifierData().toArray()),
+              fromBytes(currentSchema, entry.getValue().get()));
+          break;
+        }
+        // continue like a regular top-level union
+      case RECORD:
+        SpecificDatumReader<?> reader = new 
SpecificDatumReader<Schema>(field.schema());
+        persistent.put(field.pos(), reader.read(null, 
DecoderFactory.get().binaryDecoder(val, decoder)));
+        break;
+      default:
+        persistent.put(field.pos(), fromBytes(field.schema(), 
entry.getValue().get()));
+      }
+    }
+
+    if (currentMap != null) {
+      persistent.put(currentPos, currentMap);
+    } else if (currentArray != null) {
+      persistent.put(currentPos, new 
GenericData.Array<T>(currentField.schema(), currentArray));
+    }
+
+    persistent.clearDirty();
+
+    return row;
+  }
+
+  /**
+   * Retrieve field name from entry.
+   * @param entry The Key-Value entry
+   * @return String The field name
+   */
+  private String getFieldName(Entry<Key, Value> entry) {
+    String fieldName = mapping.columnMap.get(new 
Pair<>(entry.getKey().getColumnFamily(),
+        entry.getKey().getColumnQualifier()));
+    if (fieldName == null) {
+      fieldName = mapping.columnMap.get(new 
Pair<Text,Text>(entry.getKey().getColumnFamily(), null));
+    }
+    return fieldName;
+  }
+
+  private void setFetchColumns(Scanner scanner, String[] fields) {
+    fields = getFieldsToQuery(fields);
+    for (String field : fields) {
+      Pair<Text,Text> col = mapping.fieldMap.get(field);
+      if (col != null) {
+        if (col.getSecond() == null) {
+          scanner.fetchColumnFamily(col.getFirst());
+        } else {
+          scanner.fetchColumn(col.getFirst(), col.getSecond());
+        }
+      } else {
+        LOG.error("Mapping not found for field: {}", field);
+      }
+    }
+  }
+
+  @Override
+  public T get(K key, String[] fields) throws GoraException {
+    try {
+      // TODO make isolated scanner optional?
+      Scanner scanner = new 
IsolatedScanner(conn.createScanner(mapping.tableName, Authorizations.EMPTY));
+      Range rowRange = new Range(new Text(toBytes(key)));
+
+      scanner.setRange(rowRange);
+      setFetchColumns(scanner, fields);
+
+      T persistent = newPersistent();
+      ByteSequence row = populate(scanner.iterator(), persistent);
+      if (row == null)
+        return null;
+      return persistent;
+    } catch (Exception e) {
+      throw new GoraException(e);
+    }
+  }
+
+  @Override
+  public void put(K key, T val) throws GoraException {
+
+    try{
+      Mutation m = new Mutation(new Text(toBytes(key)));
+
+      Schema schema = val.getSchema();
+      List<Field> fields = schema.getFields();
+      int count = 0;
+
+      for (int i = 0; i < fields.size(); i++) {
+        if (!val.isDirty(i)) {
+          continue;
+        }
+        Field field = fields.get(i);
+
+        Object o = val.get(field.pos());
+
+        Pair<Text,Text> col = mapping.fieldMap.get(field.name());
+
+        if (col == null) {
+          throw new GoraException("Please define the gora to ignite mapping 
for field " + field.name());
+        }
+
+        switch (field.schema().getType()) {
+        case MAP:
+          count = putMap(m, count, field.schema().getValueType(), o, col, 
field.name());
+          break;
+        case ARRAY:
+          count = putArray(m, count, o, col, field.name());
+          break;
+        case UNION: // default value of null acts like union with null
+          Schema effectiveSchema = field.schema().getTypes()
+          .get(firstNotNullSchemaTypeIndex(field.schema()));
+          // map and array need to compute qualifier
+          if (effectiveSchema.getType() == Type.ARRAY) {
+            count = putArray(m, count, o, col, field.name());
+            break;
+          }
+          else if (effectiveSchema.getType() == Type.MAP) {
+            count = putMap(m, count, effectiveSchema.getValueType(), o, col, 
field.name());
+            break;
+          }
+          // continue like a regular top-level union
+        case RECORD:
+          final SpecificDatumWriter<Object> writer = new 
SpecificDatumWriter<>(field.schema());
+          final byte[] byteData = IOUtils.serialize(writer,o);
+          m.put(col.getFirst(), col.getSecond(), new Value(byteData));
+          count++;
+          break;
+        default:
+          m.put(col.getFirst(), col.getSecond(), new Value(toBytes(o)));
+          count++;
+        }
+
+      }
+
+      if (count > 0)
+        try {
+          getBatchWriter().addMutation(m);
+        } catch (MutationsRejectedException e) {
+          LOG.error(e.getMessage(), e);
+        }
+    } catch (GoraException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new GoraException(e);
+    }
+  }
+
+  private int putMap(Mutation m, int count, Schema valueType, Object o, 
Pair<Text, Text> col, String fieldName) throws GoraException {
+
+    // First of all we delete map field on ignite store
+    Text rowKey = new Text(m.getRow());
+    Query<K, T> query = newQuery();
+    query.setFields(fieldName);
+    query.setStartKey((K)rowKey.toString());
+    query.setEndKey((K)rowKey.toString());
+    deleteByQuery(query);
+    flush();
+    if (o == null){
+      return 0;
+    }
+
+    Set<?> es = ((Map<?, ?>)o).entrySet();
+    for (Object entry : es) {
+      Object mapKey = ((Entry<?, ?>) entry).getKey();
+      Object mapVal = ((Entry<?, ?>) entry).getValue();
+      if ((o instanceof DirtyMapWrapper && ((DirtyMapWrapper<?, 
?>)o).isDirty())
+          || !(o instanceof DirtyMapWrapper)) {
+        m.put(col.getFirst(), new Text(toBytes(mapKey)), new 
Value(toBytes(valueType, mapVal)));
+        count++;
+      }
+      // TODO map value deletion
+    }
+    return count;
+  }
+
+  private int putArray(Mutation m, int count, Object o, Pair<Text, Text> col, 
String fieldName) throws GoraException {
+
+    // First of all we delete array field on ignite store
+    Text rowKey = new Text(m.getRow());
+    Query<K, T> query = newQuery();
+    query.setFields(fieldName);
+    query.setStartKey((K)rowKey.toString());
+    query.setEndKey((K)rowKey.toString());
+    deleteByQuery(query);
+    flush();
+    if (o == null){
+      return 0;
+    }
+
+    List<?> array = (List<?>) o;  // both GenericArray and DirtyListWrapper
+    int j = 0;
+    for (Object item : array) {
+      m.put(col.getFirst(), new Text(toBytes(j++)), new Value(toBytes(item)));
+      count++;
+    }
+    return count;
+  }
+
+  @Override
+  public boolean delete(K key) throws GoraException {
+    Query<K,T> q = newQuery();
+    q.setKey(key);
+    return deleteByQuery(q) > 0;
+  }
+
+  @Override
+  public long deleteByQuery(Query<K,T> query) throws GoraException {
+    try {
+      Scanner scanner = createScanner(query);
+      // add iterator that drops values on the server side
+      scanner.addScanIterator(new IteratorSetting(Integer.MAX_VALUE, 
SortedKeyIterator.class));
+      RowIterator iterator = new RowIterator(scanner.iterator());
+
+      long count = 0;
+
+      while (iterator.hasNext()) {
+        Iterator<Entry<Key,Value>> row = iterator.next();
+        Mutation m = null;
+        while (row.hasNext()) {
+          Entry<Key,Value> entry = row.next();
+          Key key = entry.getKey();
+          if (m == null)
+            m = new Mutation(key.getRow());
+          // TODO optimize to avoid continually creating column vis? prob does 
not matter for empty
+          m.putDelete(key.getColumnFamily(), key.getColumnQualifier(), new 
ColumnVisibility(key.getColumnVisibility()), key.getTimestamp());
+        }
+        getBatchWriter().addMutation(m);
+        count++;
+      }
+
+      return count;
+    } catch (Exception e) {
+      throw new GoraException(e);
+    }
+  }
+
+  private Range createRange(Query<K,T> query) {
+    Text startRow = null;
+    Text endRow = null;
+
+    if (query.getStartKey() != null)
+      startRow = new Text(toBytes(query.getStartKey()));
+
+    if (query.getEndKey() != null)
+      endRow = new Text(toBytes(query.getEndKey()));
+
+    return new Range(startRow, true, endRow, true);
+
+  }
+
+  private Scanner createScanner(Query<K,T> query) throws 
TableNotFoundException {
+    // TODO make isolated scanner optional?
+    Scanner scanner = new 
IsolatedScanner(conn.createScanner(mapping.tableName, Authorizations.EMPTY));
+    setFetchColumns(scanner, query.getFields());
+
+    scanner.setRange(createRange(query));
+
+    if (query.getStartTime() != -1 || query.getEndTime() != -1) {
+      IteratorSetting is = new IteratorSetting(30, TimestampFilter.class);
+      if (query.getStartTime() != -1)
+        TimestampFilter.setStart(is, query.getStartTime(), true);
+      if (query.getEndTime() != -1)
+        TimestampFilter.setEnd(is, query.getEndTime(), true);
+
+      scanner.addScanIterator(is);
+    }
+
+    return scanner;
+  }
+
+  /**
+   * Execute the query and return the result.
+   */
+  @Override
+  public Result<K,T> execute(Query<K,T> query) throws GoraException {
+    try {
+      Scanner scanner = createScanner(query);
+      return new IgniteResult<>(this, query, scanner);
+    } catch (TableNotFoundException e) {
+      throw new GoraException(e) ;
+    }
+  }
+
+  @Override
+  public Query<K,T> newQuery() {
+    return new IgniteQuery<>(this);
+  }
+
+  Text pad(Text key, int bytes) {
+    if (key.getLength() < bytes)
+      key = new Text(key);
+
+    while (key.getLength() < bytes) {
+      key.append(new byte[] {0}, 0, 1);
+    }
+
+    return key;
+  }
+
+  @Override
+  public List<PartitionQuery<K,T>> getPartitions(Query<K,T> query) throws 
GoraException {
+    try {
+      TabletLocator tl;
+      if (conn instanceof MockConnector)
+        tl = new MockTabletLocator();
+      else
+        tl = TabletLocator.getLocator(new ClientContext(conn.getInstance(), 
credentials, IgniteConfiguration.getTableConfiguration(conn, 
Tables.getTableId(conn.getInstance(), mapping.tableName))), new 
Text(Tables.getTableId(conn.getInstance(), mapping.tableName)));
+
+      Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>();
+
+      tl.invalidateCache();
+      while (tl.binRanges(new ClientContext(conn.getInstance(), credentials, 
IgniteConfiguration.getTableConfiguration(conn, 
Tables.getTableId(conn.getInstance(), mapping.tableName))), 
Collections.singletonList(createRange(query)), binnedRanges).size() > 0) {
+        // TODO log?
+        if (!Tables.exists(conn.getInstance(), 
Tables.getTableId(conn.getInstance(), mapping.tableName)))
+          throw new 
TableDeletedException(Tables.getTableId(conn.getInstance(), mapping.tableName));
+        else if (Tables.getTableState(conn.getInstance(), 
Tables.getTableId(conn.getInstance(), mapping.tableName)) == TableState.OFFLINE)
+          throw new TableOfflineException(conn.getInstance(), 
Tables.getTableId(conn.getInstance(), mapping.tableName));
+        UtilWaitThread.sleep(100);
+        tl.invalidateCache();
+      }
+
+      List<PartitionQuery<K,T>> ret = new ArrayList<>();
+
+      Text startRow = null;
+      Text endRow = null;
+      if (query.getStartKey() != null)
+        startRow = new Text(toBytes(query.getStartKey()));
+      if (query.getEndKey() != null)
+        endRow = new Text(toBytes(query.getEndKey()));
+
+      //hadoop expects hostnames, ignite keeps track of IPs... so need to 
convert
+      HashMap<String,String> hostNameCache = new HashMap<>();
+
+      for (Entry<String,Map<KeyExtent,List<Range>>> entry : 
binnedRanges.entrySet()) {
+        String ip = entry.getKey().split(":", 2)[0];
+        String location = hostNameCache.get(ip);
+        if (location == null) {
+          InetAddress inetAddress = InetAddress.getByName(ip);
+          location = inetAddress.getHostName();
+          hostNameCache.put(ip, location);
+        }
+
+        Map<KeyExtent,List<Range>> tablets = entry.getValue();
+        for (KeyExtent ke : tablets.keySet()) {
+
+          K startKey = null;
+          if (startRow == null || !ke.contains(startRow)) {
+            if (ke.getPrevEndRow() != null) {
+              startKey = followingKey(encoder, getKeyClass(), 
getBytes(ke.getPrevEndRow()));
+            }
+          } else {
+            startKey = fromBytes(getKeyClass(), getBytes(startRow));
+          }
+
+          K endKey = null;
+          if (endRow == null || !ke.contains(endRow)) {
+            if (ke.getEndRow() != null)
+              endKey = lastPossibleKey(encoder, getKeyClass(), 
getBytes(ke.getEndRow()));
+          } else {
+            endKey = fromBytes(getKeyClass(), getBytes(endRow));
+          }
+
+          PartitionQueryImpl<K, T> pqi = new PartitionQueryImpl<>(query, 
startKey, endKey, location);
+          pqi.setConf(getConf());
+          ret.add(pqi);
+        }
+      }
+
+      return ret;
+    } catch (Exception e) {
+      throw new GoraException(e);
+    }
+
+  }
+
+  static <K> K lastPossibleKey(Encoder encoder, Class<K> clazz, byte[] er) {
+
+    if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
+      throw new UnsupportedOperationException();
+    } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) {
+      throw new UnsupportedOperationException();
+    } else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) {
+      return fromBytes(encoder, clazz, encoder.lastPossibleKey(2, er));
+    } else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) {
+      return fromBytes(encoder, clazz, encoder.lastPossibleKey(4, er));
+    } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) {
+      return fromBytes(encoder, clazz, encoder.lastPossibleKey(8, er));
+    } else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) {
+      return fromBytes(encoder, clazz, encoder.lastPossibleKey(4, er));
+    } else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) {
+      return fromBytes(encoder, clazz, encoder.lastPossibleKey(8, er));
+    } else if (clazz.equals(String.class)) {
+      throw new UnsupportedOperationException();
+    } else if (clazz.equals(Utf8.class)) {
+      return fromBytes(encoder, clazz, er);
+    }
+
+    throw new IllegalArgumentException(UNKOWN + clazz.getName());
+  }
+
+  @SuppressWarnings("unchecked")
+  static <K> K followingKey(Encoder encoder, Class<K> clazz, byte[] per) {
+
+    if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
+      return (K) Byte.valueOf(encoder.followingKey(1, per)[0]);
+    } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) {
+      throw new UnsupportedOperationException();
+    } else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) {
+      return fromBytes(encoder, clazz, encoder.followingKey(2, per));
+    } else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) {
+      return fromBytes(encoder, clazz, encoder.followingKey(4, per));
+    } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) {
+      return fromBytes(encoder, clazz, encoder.followingKey(8, per));
+    } else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) {
+      return fromBytes(encoder, clazz, encoder.followingKey(4, per));
+    } else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) {
+      return fromBytes(encoder, clazz, encoder.followingKey(8, per));
+    } else if (clazz.equals(String.class)) {
+      throw new UnsupportedOperationException();
+    } else if (clazz.equals(Utf8.class)) {
+      return fromBytes(encoder, clazz, Arrays.copyOf(per, per.length + 1));
+    }
+
+    throw new IllegalArgumentException(UNKOWN + clazz.getName());
+  }
+
+  @Override
+  public void flush() throws GoraException {
+    try {
+      if (batchWriter != null) {
+        batchWriter.flush();
+      }
+    } catch (Exception e) {
+      throw new GoraException(e);
+    }
+  }
+
+  @Override
+  public void close() {
+    try {
+      if (batchWriter != null) {
+        batchWriter.close();
+        batchWriter = null;
+      }
+    } catch (MutationsRejectedException e) {
+      LOG.error(e.getMessage(), e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/4346216f/gora-ignite/src/main/java/org/apache/gora/ignite/store/package-info.java
----------------------------------------------------------------------
diff --git 
a/gora-ignite/src/main/java/org/apache/gora/ignite/store/package-info.java 
b/gora-ignite/src/main/java/org/apache/gora/ignite/store/package-info.java
new file mode 100644
index 0000000..62cba67
--- /dev/null
+++ b/gora-ignite/src/main/java/org/apache/gora/ignite/store/package-info.java
@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains all the Ignite store related classes.
+ */
+package org.apache.gora.ignite.store;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/gora/blob/4346216f/gora-ignite/src/main/java/org/apache/gora/ignite/util/FixedByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git 
a/gora-ignite/src/main/java/org/apache/gora/ignite/util/FixedByteArrayOutputStream.java
 
b/gora-ignite/src/main/java/org/apache/gora/ignite/util/FixedByteArrayOutputStream.java
new file mode 100644
index 0000000..97fb46a
--- /dev/null
+++ 
b/gora-ignite/src/main/java/org/apache/gora/ignite/util/FixedByteArrayOutputStream.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.ignite.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * It is a implementation of {@link java.io.OutputStream} must always provide 
at least a method that writes one byte of output.
+ */
+public class FixedByteArrayOutputStream extends OutputStream {
+  
+  private int i;
+  byte out[];
+  
+  public FixedByteArrayOutputStream(byte out[]) {
+    this.out = out;
+  }
+  
+  @Override
+  public void write(int b) throws IOException {
+    out[i++] = (byte) b;
+  }
+  
+  @Override
+  public void write(byte b[], int off, int len) throws IOException {
+    System.arraycopy(b, off, out, i, len);
+    i += len;
+  }
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/gora/blob/4346216f/gora-ignite/src/main/java/org/apache/gora/ignite/util/package-info.java
----------------------------------------------------------------------
diff --git 
a/gora-ignite/src/main/java/org/apache/gora/ignite/util/package-info.java 
b/gora-ignite/src/main/java/org/apache/gora/ignite/util/package-info.java
new file mode 100644
index 0000000..eedb84b
--- /dev/null
+++ b/gora-ignite/src/main/java/org/apache/gora/ignite/util/package-info.java
@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains Ignite store related util classes.
+ */
+package org.apache.gora.ignite.util;
\ No newline at end of file

Reply via email to