create template gora-ignite module
Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/4346216f Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/4346216f Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/4346216f Branch: refs/heads/master Commit: 4346216ffbed9a9adef2b90589436de543167c4c Parents: fb4b355 Author: Carlos Rodrigo Muñoz <[email protected]> Authored: Mon May 7 23:00:44 2018 -0500 Committer: Carlos Rodrigo Muñoz <[email protected]> Committed: Mon May 7 23:01:18 2018 -0500 ---------------------------------------------------------------------- gora-ignite/pom.xml | 177 +++ gora-ignite/src/examples/java/.gitignore | 15 + .../gora/ignite/encoders/BinaryEncoder.java | 191 ++++ .../apache/gora/ignite/encoders/Encoder.java | 72 ++ .../apache/gora/ignite/encoders/HexEncoder.java | 204 ++++ .../ignite/encoders/SignedBinaryEncoder.java | 110 ++ .../org/apache/gora/ignite/encoders/Utils.java | 91 ++ .../gora/ignite/encoders/package-info.java | 20 + .../org/apache/gora/ignite/package-info.java | 20 + .../apache/gora/ignite/query/IgniteQuery.java | 45 + .../apache/gora/ignite/query/IgniteResult.java | 101 ++ .../apache/gora/ignite/query/package-info.java | 21 + .../apache/gora/ignite/store/IgniteMapping.java | 44 + .../apache/gora/ignite/store/IgniteStore.java | 1034 ++++++++++++++++++ .../apache/gora/ignite/store/package-info.java | 20 + .../ignite/util/FixedByteArrayOutputStream.java | 45 + .../apache/gora/ignite/util/package-info.java | 20 + .../gora/ignite/GoraIgniteTestDriver.java | 73 ++ .../org/apache/gora/ignite/package-info.java | 21 + .../ignite/store/AuthenticationTokenTest.java | 90 ++ .../gora/ignite/store/IgniteStoreTest.java | 87 ++ .../apache/gora/ignite/store/PartitionTest.java | 96 ++ .../apache/gora/ignite/store/package-info.java | 21 + .../apache/gora/ignite/util/HexEncoderTest.java | 56 + .../ignite/util/SignedBinaryEncoderTest.java | 167 +++ .../apache/gora/ignite/util/package-info.java | 20 + .../test/resources/gora-accumulo-mapping.xml | 59 + gora-ignite/src/test/resources/gora.properties | 21 + pom.xml | 1 + 29 files changed, 2942 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/4346216f/gora-ignite/pom.xml ---------------------------------------------------------------------- diff --git a/gora-ignite/pom.xml b/gora-ignite/pom.xml new file mode 100644 index 0000000..ddd3a4a --- /dev/null +++ b/gora-ignite/pom.xml @@ -0,0 +1,177 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.gora</groupId> + <artifactId>gora</artifactId> + <version>0.9-SNAPSHOT</version> + <relativePath>../</relativePath> + </parent> + <artifactId>gora-ignite</artifactId> + <packaging>bundle</packaging> + + <name>Apache Gora :: Ignite</name> + <url>http://gora.apache.org</url> + <description>The Apache Gora open source framework provides an in-memory data model and + persistence for big data. Gora supports persisting to column stores, key value stores, + document stores and RDBMSs, and analyzing the data with extensive Apache Hadoop MapReduce + support.</description> + <inceptionYear>2010</inceptionYear> + <organization> + <name>The Apache Software Foundation</name> + <url>http://www.apache.org/</url> + </organization> + <issueManagement> + <system>JIRA</system> + <url>https://issues.apache.org/jira/browse/GORA</url> + </issueManagement> + <ciManagement> + <system>Jenkins</system> + <url>https://builds.apache.org/job/Gora-trunk/</url> + </ciManagement> + + <properties> + <ignite.version>1.7.1</ignite.version> + <osgi.import>*</osgi.import> + <osgi.export>org.apache.gora.ignite*;version="${project.version}";-noimport:=true</osgi.export> + </properties> + + <build> + <directory>target</directory> + <outputDirectory>target/classes</outputDirectory> + <finalName>${project.artifactId}-${project.version}</finalName> + <testOutputDirectory>target/test-classes</testOutputDirectory> + <testSourceDirectory>src/test/java</testSourceDirectory> + <sourceDirectory>src/main/java</sourceDirectory> + <testResources> + <testResource> + <directory>${project.basedir}/src/test/resources</directory> + <includes> + <include>**/*</include> + </includes> + <!--targetPath>${project.basedir}/target/classes/</targetPath --> + </testResource> + </testResources> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>${build-helper-maven-plugin.version}</version> + <executions> + <execution> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/examples/java</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <dependencies> + <!-- Gora Internal Dependencies --> + <dependency> + <groupId>org.apache.gora</groupId> + <artifactId>gora-core</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.gora</groupId> + <artifactId>gora-core</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <!--Ignite Dependency --> + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-core</artifactId> + <version>${ignite.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-minicluster</artifactId> + <version>${ignite.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + + <dependency> + <!-- Ignite mini was trying to load the Shell, which failed with the version of jline in the parent pom --> + <groupId>jline</groupId> + <artifactId>jline</artifactId> + <version>2.11</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </dependency> + + <!-- Logging Dependencies --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <exclusions> + <exclusion> + <groupId>javax.jms</groupId> + <artifactId>jms</artifactId> + </exclusion> + </exclusions> + </dependency> + + <!-- Testing Dependencies --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + </dependency> + + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/gora/blob/4346216f/gora-ignite/src/examples/java/.gitignore ---------------------------------------------------------------------- diff --git a/gora-ignite/src/examples/java/.gitignore b/gora-ignite/src/examples/java/.gitignore new file mode 100644 index 0000000..09697dc --- /dev/null +++ b/gora-ignite/src/examples/java/.gitignore @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + http://git-wip-us.apache.org/repos/asf/gora/blob/4346216f/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/BinaryEncoder.java ---------------------------------------------------------------------- diff --git a/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/BinaryEncoder.java b/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/BinaryEncoder.java new file mode 100644 index 0000000..ce6eded --- /dev/null +++ b/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/BinaryEncoder.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.ignite.encoders; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.gora.ignite.util.FixedByteArrayOutputStream; + +/** + * + */ +public class BinaryEncoder implements Encoder { + + @Override + public byte[] encodeShort(short s) throws IOException { + return encodeShort(s, new byte[2]); + } + + @Override + public byte[] encodeShort(short s, byte[] ret) throws IOException { + try (DataOutputStream dos = new DataOutputStream(new FixedByteArrayOutputStream(ret))){ + dos.writeShort(s); + dos.close(); + return ret; + } + } + + @Override + public short decodeShort(byte[] a) throws IOException { + try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(a))){ + short s = dis.readShort(); + dis.close(); + return s; + } + } + + @Override + public byte[] encodeInt(int i) throws IOException { + return encodeInt(i, new byte[4]); + } + + @Override + public byte[] encodeInt(int i, byte[] ret) throws IOException { + try (DataOutputStream dos = new DataOutputStream(new FixedByteArrayOutputStream(ret))){ + dos.writeInt(i); + dos.close(); + return ret; + } + } + + @Override + public int decodeInt(byte[] a) throws IOException { + try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(a))){ + int i = dis.readInt(); + dis.close(); + return i; + } + } + + @Override + public byte[] encodeLong(long l) throws IOException { + return encodeLong(l, new byte[8]); + } + + @Override + public byte[] encodeLong(long l, byte[] ret) throws IOException { + try (DataOutputStream dos = new DataOutputStream(new FixedByteArrayOutputStream(ret))){ + dos.writeLong(l); + dos.close(); + return ret; + } + } + + @Override + public long decodeLong(byte[] a) throws IOException { + try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(a))){ + long l = dis.readLong(); + dis.close(); + return l; + } + } + + @Override + public byte[] encodeDouble(double d) throws IOException { + return encodeDouble(d, new byte[8]); + } + + @Override + public byte[] encodeDouble(double d, byte[] ret) throws IOException { + try (DataOutputStream dos = new DataOutputStream(new FixedByteArrayOutputStream(ret))){ + long l = Double.doubleToRawLongBits(d); + dos.writeLong(l); + dos.close(); + return ret; + } + } + + @Override + public double decodeDouble(byte[] a) throws IOException { + try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(a))){ + long l = dis.readLong(); + dis.close(); + return Double.longBitsToDouble(l); + } + } + + @Override + public byte[] encodeFloat(float d) throws IOException { + return encodeFloat(d, new byte[4]); + } + + @Override + public byte[] encodeFloat(float f, byte[] ret) throws IOException { + try (DataOutputStream dos = new DataOutputStream(new FixedByteArrayOutputStream(ret))){ + int i = Float.floatToRawIntBits(f); + dos.writeInt(i); + return ret; + } + } + + @Override + public float decodeFloat(byte[] a) throws IOException { + try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(a))){ + int i = dis.readInt(); + return Float.intBitsToFloat(i); + } + } + + @Override + public byte[] encodeByte(byte b, byte[] ret) { + ret[0] = 0; + return ret; + } + + @Override + public byte[] encodeByte(byte b) { + return encodeByte(b, new byte[1]); + } + + @Override + public byte decodeByte(byte[] a) { + return a[0]; + } + + @Override + public boolean decodeBoolean(byte[] a) throws IOException { + try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(a))){ + return dis.readBoolean(); + } + } + + @Override + public byte[] encodeBoolean(boolean b) throws IOException { + return encodeBoolean(b, new byte[1]); + } + + @Override + public byte[] encodeBoolean(boolean b, byte[] ret) throws IOException { + try (DataOutputStream dos = new DataOutputStream(new FixedByteArrayOutputStream(ret))){ + dos.writeBoolean(b); + return ret; + } + } + + @Override + public byte[] lastPossibleKey(int size, byte[] er) { + return Utils.lastPossibleKey(size, er); + } + + @Override + public byte[] followingKey(int size, byte[] per) { + return Utils.followingKey(size, per); + } +} http://git-wip-us.apache.org/repos/asf/gora/blob/4346216f/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/Encoder.java ---------------------------------------------------------------------- diff --git a/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/Encoder.java b/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/Encoder.java new file mode 100644 index 0000000..deea4a7 --- /dev/null +++ b/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/Encoder.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.ignite.encoders; + +import java.io.IOException; + +/** + * + */ +public interface Encoder { + + public byte[] encodeByte(byte b, byte[] ret); + + public byte[] encodeByte(byte b); + + public byte decodeByte(byte[] a); + + public byte[] encodeShort(short s) throws IOException; + + public byte[] encodeShort(short s, byte[] ret) throws IOException; + + public short decodeShort(byte[] a) throws IOException; + + public byte[] encodeInt(int i) throws IOException; + + public byte[] encodeInt(int i, byte[] ret) throws IOException; + + public int decodeInt(byte[] a) throws IOException; + + public byte[] encodeLong(long l) throws IOException; + + public byte[] encodeLong(long l, byte[] ret) throws IOException; + + public long decodeLong(byte[] a) throws IOException; + + public byte[] encodeDouble(double d) throws IOException; + + public byte[] encodeDouble(double d, byte[] ret) throws IOException; + + public double decodeDouble(byte[] a) throws IOException; + + public byte[] encodeFloat(float d) throws IOException; + + public byte[] encodeFloat(float f, byte[] ret) throws IOException; + + public float decodeFloat(byte[] a) throws IOException; + + public boolean decodeBoolean(byte[] val) throws IOException; + + public byte[] encodeBoolean(boolean b) throws IOException; + + public byte[] encodeBoolean(boolean b, byte[] ret) throws IOException; + + byte[] followingKey(int size, byte[] per); + + byte[] lastPossibleKey(int size, byte[] er); + +} http://git-wip-us.apache.org/repos/asf/gora/blob/4346216f/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/HexEncoder.java ---------------------------------------------------------------------- diff --git a/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/HexEncoder.java b/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/HexEncoder.java new file mode 100644 index 0000000..8568ba9 --- /dev/null +++ b/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/HexEncoder.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.ignite.encoders; + +/** + * Encodes data in a ascii hex representation + */ + +public class HexEncoder implements Encoder { + + private byte[] chars = new byte[] {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; + + private void encode(byte[] a, long l) { + for (int i = a.length - 1; i >= 0; i--) { + a[i] = chars[(int) (l & 0x0f)]; + l = l >>> 4; + } + } + + private int fromChar(byte b) { + if (b >= '0' && b <= '9') { + return b - '0'; + } else if (b >= 'a' && b <= 'f') { + return b - 'a' + 10; + } + + throw new IllegalArgumentException("Bad char " + b); + } + + private long decode(byte[] a) { + long b = 0; + for (byte anA : a) { + b = b << 4; + b |= fromChar(anA); + } + + return b; + } + + @Override + public byte[] encodeByte(byte b, byte[] ret) { + encode(ret, 0xff & b); + return ret; + } + + @Override + public byte[] encodeByte(byte b) { + return encodeByte(b, new byte[2]); + } + + @Override + public byte decodeByte(byte[] a) { + return (byte) decode(a); + } + + @Override + public byte[] encodeShort(short s) { + return encodeShort(s, new byte[4]); + } + + @Override + public byte[] encodeShort(short s, byte[] ret) { + encode(ret, 0xffff & s); + return ret; + } + + @Override + public short decodeShort(byte[] a) { + return (short) decode(a); + } + + @Override + public byte[] encodeInt(int i) { + return encodeInt(i, new byte[8]); + } + + @Override + public byte[] encodeInt(int i, byte[] ret) { + encode(ret, i); + return ret; + } + + @Override + public int decodeInt(byte[] a) { + return (int) decode(a); + } + + @Override + public byte[] encodeLong(long l) { + return encodeLong(l, new byte[16]); + } + + @Override + public byte[] encodeLong(long l, byte[] ret) { + encode(ret, l); + return ret; + } + + @Override + public long decodeLong(byte[] a) { + return decode(a); + } + + @Override + public byte[] encodeDouble(double d) { + return encodeDouble(d, new byte[16]); + } + + @Override + public byte[] encodeDouble(double d, byte[] ret) { + return encodeLong(Double.doubleToRawLongBits(d), ret); + } + + @Override + public double decodeDouble(byte[] a) { + return Double.longBitsToDouble(decodeLong(a)); + } + + @Override + public byte[] encodeFloat(float d) { + return encodeFloat(d, new byte[16]); + } + + @Override + public byte[] encodeFloat(float d, byte[] ret) { + return encodeInt(Float.floatToRawIntBits(d), ret); + } + + @Override + public float decodeFloat(byte[] a) { + return Float.intBitsToFloat(decodeInt(a)); + } + + @Override + public boolean decodeBoolean(byte[] val) { + return decodeByte(val) == 1; + } + + @Override + public byte[] encodeBoolean(boolean b) { + return encodeBoolean(b, new byte[2]); + } + + @Override + public byte[] encodeBoolean(boolean b, byte[] ret) { + if (b) + encode(ret, 1); + else + encode(ret, 0); + + return ret; + } + + private byte[] toBinary(byte[] hex) { + byte[] bin = new byte[(hex.length / 2) + (hex.length % 2)]; + + int j = 0; + for (int i = 0; i < bin.length; i++) { + bin[i] = (byte) (fromChar(hex[j++]) << 4); + if (j >= hex.length) + break; + bin[i] |= (byte) fromChar(hex[j++]); + } + + return bin; + } + + private byte[] fromBinary(byte[] bin) { + byte[] hex = new byte[bin.length * 2]; + + int j = 0; + for (byte aBin : bin) { + hex[j++] = chars[0x0f & (aBin >>> 4)]; + hex[j++] = chars[0x0f & aBin]; + } + + return hex; + } + + @Override + public byte[] followingKey(int size, byte[] per) { + return fromBinary(Utils.followingKey(size, toBinary(per))); + } + + @Override + public byte[] lastPossibleKey(int size, byte[] er) { + return fromBinary(Utils.lastPossibleKey(size, toBinary(er))); + } + +} http://git-wip-us.apache.org/repos/asf/gora/blob/4346216f/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/SignedBinaryEncoder.java ---------------------------------------------------------------------- diff --git a/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/SignedBinaryEncoder.java b/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/SignedBinaryEncoder.java new file mode 100644 index 0000000..a8216f4 --- /dev/null +++ b/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/SignedBinaryEncoder.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.ignite.encoders; + +import java.io.IOException; + +/** + * This class transforms this bits within a primitive type so that + * the bit representation sorts correctly lexographicaly. Primarily + * it does some simple transformations so that negative numbers sort + * before positive numbers, when compared lexographically. + */ +public class SignedBinaryEncoder extends BinaryEncoder { + + @Override + public byte[] encodeShort(short s, byte[] ret) throws IOException{ + s = (short)((s & 0xffff) ^ 0x8000); + return super.encodeShort(s, ret); + } + + @Override + public short decodeShort(byte[] a) throws IOException{ + short s = super.decodeShort(a); + s = (short)((s & 0xffff) ^ 0x8000); + return s; + } + + @Override + public byte[] encodeInt(int i, byte[] ret) throws IOException{ + i = i ^ 0x80000000; + return super.encodeInt(i, ret); + } + + @Override + public int decodeInt(byte[] a) throws IOException{ + int i = super.decodeInt(a); + i = i ^ 0x80000000; + return i; + } + + @Override + public byte[] encodeLong(long l, byte[] ret) throws IOException{ + l = l ^ 0x8000000000000000L; + return super.encodeLong(l, ret); + } + + @Override + public long decodeLong(byte[] a) throws IOException { + long l = super.decodeLong(a); + l = l ^ 0x8000000000000000L; + return l; + } + + @Override + public byte[] encodeDouble(double d, byte[] ret) throws IOException { + long l = Double.doubleToRawLongBits(d); + if(l < 0) + l = ~l; + else + l = l ^ 0x8000000000000000L; + return super.encodeLong(l,ret); + } + + @Override + public double decodeDouble(byte[] a) throws IOException{ + long l = super.decodeLong(a); + if(l < 0) + l = l ^ 0x8000000000000000L; + else + l = ~l; + return Double.longBitsToDouble(l); + } + + @Override + public byte[] encodeFloat(float f, byte[] ret) throws IOException { + int i = Float.floatToRawIntBits(f); + if(i < 0) + i = ~i; + else + i = i ^ 0x80000000; + + return super.encodeInt(i, ret); + + } + + @Override + public float decodeFloat(byte[] a) throws IOException{ + int i = super.decodeInt(a); + if(i < 0) + i = i ^ 0x80000000; + else + i = ~i; + return Float.intBitsToFloat(i); + } + +} http://git-wip-us.apache.org/repos/asf/gora/blob/4346216f/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/Utils.java ---------------------------------------------------------------------- diff --git a/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/Utils.java b/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/Utils.java new file mode 100644 index 0000000..8a5980c --- /dev/null +++ b/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/Utils.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.ignite.encoders; + +import java.math.BigInteger; +import java.util.Arrays; + +/** + * + */ +public class Utils { + private static BigInteger newPositiveBigInteger(byte[] er) { + byte[] copy = new byte[er.length + 1]; + System.arraycopy(er, 0, copy, 1, er.length); + BigInteger bi = new BigInteger(copy); + return bi; + } + + public static byte[] lastPossibleKey(int size, byte[] er) { + if (size == er.length) + return er; + + if (er.length > size) + throw new IllegalArgumentException(); + + BigInteger bi = newPositiveBigInteger(er); + if (bi.equals(BigInteger.ZERO)) + throw new IllegalArgumentException("Nothing comes before zero"); + + bi = bi.subtract(BigInteger.ONE); + + byte[] ret = new byte[size]; + Arrays.fill(ret, (byte) 0xff); + + System.arraycopy(getBytes(bi, er.length), 0, ret, 0, er.length); + + return ret; + } + + private static byte[] getBytes(BigInteger bi, int minLen) { + byte[] ret = bi.toByteArray(); + + if (ret[0] == 0) { + // remove leading 0 that makes num positive + byte[] copy = new byte[ret.length - 1]; + System.arraycopy(ret, 1, copy, 0, copy.length); + ret = copy; + } + + // leading digits are dropped + byte[] copy = new byte[minLen]; + if (bi.compareTo(BigInteger.ZERO) < 0) { + Arrays.fill(copy, (byte) 0xff); + } + System.arraycopy(ret, 0, copy, minLen - ret.length, ret.length); + + return copy; + } + + public static byte[] followingKey(int size, byte[] per) { + + if (per.length > size) + throw new IllegalArgumentException(); + + if (size == per.length) { + // add one + BigInteger bi = new BigInteger(per); + bi = bi.add(BigInteger.ONE); + if (bi.equals(BigInteger.ZERO)) { + throw new IllegalArgumentException("Wrapped"); + } + return getBytes(bi, size); + } else { + return Arrays.copyOf(per, size); + } + } +} http://git-wip-us.apache.org/repos/asf/gora/blob/4346216f/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/package-info.java ---------------------------------------------------------------------- diff --git a/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/package-info.java b/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/package-info.java new file mode 100644 index 0000000..574aa24 --- /dev/null +++ b/gora-ignite/src/main/java/org/apache/gora/ignite/encoders/package-info.java @@ -0,0 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains Ignite store related util classes for encoder. + */ +package org.apache.gora.ignite.encoders; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/4346216f/gora-ignite/src/main/java/org/apache/gora/ignite/package-info.java ---------------------------------------------------------------------- diff --git a/gora-ignite/src/main/java/org/apache/gora/ignite/package-info.java b/gora-ignite/src/main/java/org/apache/gora/ignite/package-info.java new file mode 100644 index 0000000..a7fa7ab --- /dev/null +++ b/gora-ignite/src/main/java/org/apache/gora/ignite/package-info.java @@ -0,0 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains Ignite datastore related all classes. + */ +package org.apache.gora.ignite; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/4346216f/gora-ignite/src/main/java/org/apache/gora/ignite/query/IgniteQuery.java ---------------------------------------------------------------------- diff --git a/gora-ignite/src/main/java/org/apache/gora/ignite/query/IgniteQuery.java b/gora-ignite/src/main/java/org/apache/gora/ignite/query/IgniteQuery.java new file mode 100644 index 0000000..85a59c9 --- /dev/null +++ b/gora-ignite/src/main/java/org/apache/gora/ignite/query/IgniteQuery.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.ignite.query; + +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.impl.QueryBase; +import org.apache.gora.store.DataStore; + +/** + * Ignite specific implementation of the {@link org.apache.gora.query.Query} interface. + */ +public class IgniteQuery<K,T extends PersistentBase> extends QueryBase<K,T> { + + /** + * Constructor for the query + */ + public IgniteQuery() { + super(null); + } + + /** + * Constructor for the query + * + * @param dataStore Data store used + * + */ + public IgniteQuery(DataStore<K,T> dataStore) { + super(dataStore); + } + +} http://git-wip-us.apache.org/repos/asf/gora/blob/4346216f/gora-ignite/src/main/java/org/apache/gora/ignite/query/IgniteResult.java ---------------------------------------------------------------------- diff --git a/gora-ignite/src/main/java/org/apache/gora/ignite/query/IgniteResult.java b/gora-ignite/src/main/java/org/apache/gora/ignite/query/IgniteResult.java new file mode 100644 index 0000000..416e650 --- /dev/null +++ b/gora-ignite/src/main/java/org/apache/gora/ignite/query/IgniteResult.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.ignite.query; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map.Entry; + +import org.apache.ignite.core.client.RowIterator; +import org.apache.ignite.core.client.Scanner; +import org.apache.ignite.core.data.ByteSequence; +import org.apache.ignite.core.data.Key; +import org.apache.ignite.core.data.Value; +import org.apache.gora.ignite.store.IgniteStore; +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.Query; +import org.apache.gora.query.impl.ResultBase; +import org.apache.gora.store.DataStore; + +/** + * Ignite specific implementation of the {@link org.apache.gora.query.Result} interface. + */ +public class IgniteResult<K,T extends PersistentBase> extends ResultBase<K,T> { + + private RowIterator iterator; + + /** + * Gets the data store used + */ + public IgniteStore<K,T> getDataStore() { + return (IgniteStore<K,T>) super.getDataStore(); + } + + /** + * @param dataStore + * @param query + * @param scanner + */ + public IgniteResult(DataStore<K,T> dataStore, Query<K,T> query, Scanner scanner) { + super(dataStore, query); + + if (this.limit > 0) { + scanner.setBatchSize((int) this.limit); + } + iterator = new RowIterator(scanner.iterator()); + } + + /** + * Gets the items reading progress + */ + @Override + public float getProgress() throws IOException { + if (this.limit != -1) { + return (float) this.offset / (float) this.limit; + } else { + return 0; + } + } + + @Override + public void close() throws IOException { + + } + + /** + * Gets the next item + */ + @Override + protected boolean nextInner() throws IOException { + + if (!iterator.hasNext()) + return false; + + key = null; + + Iterator<Entry<Key,Value>> nextRow = iterator.next(); + ByteSequence row = getDataStore().populate(nextRow, persistent); + key = ((IgniteStore<K, T>) dataStore).fromBytes(getKeyClass(), row.toArray()); + + return true; + } + + @Override + public int size() { + return (int) this.limit; + } +} http://git-wip-us.apache.org/repos/asf/gora/blob/4346216f/gora-ignite/src/main/java/org/apache/gora/ignite/query/package-info.java ---------------------------------------------------------------------- diff --git a/gora-ignite/src/main/java/org/apache/gora/ignite/query/package-info.java b/gora-ignite/src/main/java/org/apache/gora/ignite/query/package-info.java new file mode 100644 index 0000000..b1a306c --- /dev/null +++ b/gora-ignite/src/main/java/org/apache/gora/ignite/query/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains all the Ignite store query representation class as well as Result set representing class + * when query is executed over the Ignite dataStore. + */ +package org.apache.gora.ignite.query; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/4346216f/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteMapping.java ---------------------------------------------------------------------- diff --git a/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteMapping.java b/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteMapping.java new file mode 100644 index 0000000..b46c063 --- /dev/null +++ b/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteMapping.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.ignite.store; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.ignite.core.util.Pair; +import org.apache.hadoop.io.Text; + +/** + * Mapping definitions for Ignite. + */ +public class IgniteMapping { + + /** + * A map of field names to Field objects containing schema's fields + */ + Map<String,Pair<Text,Text>> fieldMap = new HashMap<>(); + + /** + * Look up the column associated to the Avro field. + */ + Map<Pair<Text,Text>,String> columnMap = new HashMap<>(); + + Map<String,String> tableConfig = new HashMap<>(); + String tableName; + String encoder; + +} http://git-wip-us.apache.org/repos/asf/gora/blob/4346216f/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteStore.java ---------------------------------------------------------------------- diff --git a/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteStore.java b/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteStore.java new file mode 100644 index 0000000..2f5faf9 --- /dev/null +++ b/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteStore.java @@ -0,0 +1,1034 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.ignite.store; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; + +import org.apache.ignite.core.client.IgniteException; +import org.apache.ignite.core.client.IgniteSecurityException; +import org.apache.ignite.core.client.BatchWriter; +import org.apache.ignite.core.client.BatchWriterConfig; +import org.apache.ignite.core.client.Connector; +import org.apache.ignite.core.client.IsolatedScanner; +import org.apache.ignite.core.client.IteratorSetting; +import org.apache.ignite.core.client.MutationsRejectedException; +import org.apache.ignite.core.client.RowIterator; +import org.apache.ignite.core.client.Scanner; +import org.apache.ignite.core.client.TableDeletedException; +import org.apache.ignite.core.client.TableExistsException; +import org.apache.ignite.core.client.TableNotFoundException; +import org.apache.ignite.core.client.TableOfflineException; +import org.apache.ignite.core.client.ZooKeeperInstance; +import org.apache.ignite.core.client.impl.ClientContext; +import org.apache.ignite.core.client.impl.Tables; +import org.apache.ignite.core.client.impl.TabletLocator; +import org.apache.ignite.core.client.mock.MockConnector; +import org.apache.ignite.core.client.mock.MockInstance; +import org.apache.ignite.core.client.mock.impl.MockTabletLocator; +import org.apache.ignite.core.client.security.tokens.AuthenticationToken; +import org.apache.ignite.core.client.security.tokens.PasswordToken; +import org.apache.ignite.core.conf.IgniteConfiguration; +import org.apache.ignite.core.data.ByteSequence; +import org.apache.ignite.core.data.Key; +import org.apache.ignite.core.data.impl.KeyExtent; +import org.apache.ignite.core.data.Mutation; +import org.apache.ignite.core.data.Range; +import org.apache.ignite.core.data.Value; +import org.apache.ignite.core.iterators.SortedKeyIterator; +import org.apache.ignite.core.iterators.user.TimestampFilter; +import org.apache.ignite.core.master.state.tables.TableState; +import org.apache.ignite.core.security.Authorizations; +import org.apache.ignite.core.security.ColumnVisibility; +import org.apache.ignite.core.client.impl.Credentials; +import org.apache.ignite.core.util.Pair; +import org.apache.ignite.core.util.UtilWaitThread; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.Schema.Type; +import org.apache.avro.generic.GenericData; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.avro.util.Utf8; +import org.apache.gora.ignite.encoders.BinaryEncoder; +import org.apache.gora.ignite.encoders.Encoder; +import org.apache.gora.ignite.query.IgniteQuery; +import org.apache.gora.ignite.query.IgniteResult; +import org.apache.gora.persistency.impl.DirtyListWrapper; +import org.apache.gora.persistency.impl.DirtyMapWrapper; +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.PartitionQuery; +import org.apache.gora.query.Query; +import org.apache.gora.query.Result; +import org.apache.gora.query.impl.PartitionQueryImpl; +import org.apache.gora.store.DataStoreFactory; +import org.apache.gora.store.impl.DataStoreBase; +import org.apache.gora.util.AvroUtils; +import org.apache.gora.util.GoraException; +import org.apache.gora.util.IOUtils; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.NodeList; + +/** + * Implementation of a Ignite data store to be used by gora. + * + * @param <K> + * class to be used for the key + * @param <T> + * class to be persisted within the store + */ +public class IgniteStore<K,T extends PersistentBase> extends DataStoreBase<K,T> { + + protected static final String MOCK_PROPERTY = "ignite.mock"; + protected static final String INSTANCE_NAME_PROPERTY = "ignite.instance"; + protected static final String ZOOKEEPERS_NAME_PROPERTY = "ignite.zookeepers"; + protected static final String USERNAME_PROPERTY = "ignite.user"; + protected static final String PASSWORD_PROPERTY = "ignite.password"; + protected static final String DEFAULT_MAPPING_FILE = "gora-ignite-mapping.xml"; + + private final static String UNKOWN = "Unknown type "; + + private Connector conn; + private BatchWriter batchWriter; + private IgniteMapping mapping; + private Credentials credentials; + private Encoder encoder; + + public static final Logger LOG = LoggerFactory.getLogger(IgniteStore.class); + + public Object fromBytes(Schema schema, byte[] data) throws IOException { + Schema fromSchema = null; + if (schema.getType() == Type.UNION) { + try { + Decoder decoder = DecoderFactory.get().binaryDecoder(data, null); + int unionIndex = decoder.readIndex(); + List<Schema> possibleTypes = schema.getTypes(); + fromSchema = possibleTypes.get(unionIndex); + Schema effectiveSchema = possibleTypes.get(unionIndex); + if (effectiveSchema.getType() == Type.NULL) { + decoder.readNull(); + return null; + } else { + data = decoder.readBytes(null).array(); + } + } catch (IOException e) { + LOG.error(e.getMessage()); + throw new GoraException("Error decoding union type: ", e); + } + } else { + fromSchema = schema; + } + return fromBytes(encoder, fromSchema, data); + } + + public static Object fromBytes(Encoder encoder, Schema schema, byte data[]) throws IOException { + switch (schema.getType()) { + case BOOLEAN: + return encoder.decodeBoolean(data); + case DOUBLE: + return encoder.decodeDouble(data); + case FLOAT: + return encoder.decodeFloat(data); + case INT: + return encoder.decodeInt(data); + case LONG: + return encoder.decodeLong(data); + case STRING: + return new Utf8(data); + case BYTES: + return ByteBuffer.wrap(data); + case ENUM: + return AvroUtils.getEnumValue(schema, encoder.decodeInt(data)); + case ARRAY: + break; + case FIXED: + break; + case MAP: + break; + case NULL: + break; + case RECORD: + break; + case UNION: + break; + default: + break; + } + throw new IllegalArgumentException(UNKOWN + schema.getType()); + + } + + private static byte[] getBytes(Text text) { + byte[] bytes = text.getBytes(); + if (bytes.length != text.getLength()) { + bytes = new byte[text.getLength()]; + System.arraycopy(text.getBytes(), 0, bytes, 0, bytes.length); + } + return bytes; + } + + public K fromBytes(Class<K> clazz, byte[] val) { + return fromBytes(encoder, clazz, val); + } + + @SuppressWarnings("unchecked") + public static <K> K fromBytes(Encoder encoder, Class<K> clazz, byte[] val) { + try { + if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) { + return (K) Byte.valueOf(encoder.decodeByte(val)); + } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) { + return (K) Boolean.valueOf(encoder.decodeBoolean(val)); + } else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) { + return (K) Short.valueOf(encoder.decodeShort(val)); + } else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) { + return (K) Integer.valueOf(encoder.decodeInt(val)); + } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) { + return (K) Long.valueOf(encoder.decodeLong(val)); + } else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) { + return (K) Float.valueOf(encoder.decodeFloat(val)); + } else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) { + return (K) Double.valueOf(encoder.decodeDouble(val)); + } else if (clazz.equals(String.class)) { + return (K) new String(val, "UTF-8"); + } else if (clazz.equals(Utf8.class)) { + return (K) new Utf8(val); + } + + throw new IllegalArgumentException(UNKOWN + clazz.getName()); + } catch (IOException ioe) { + LOG.error(ioe.getMessage()); + throw new RuntimeException(ioe); + } + } + + private static byte[] copyIfNeeded(byte b[], int offset, int len) { + if (len != b.length || offset != 0) { + byte[] copy = new byte[len]; + System.arraycopy(b, offset, copy, 0, copy.length); + b = copy; + } + return b; + } + + public byte[] toBytes(Schema toSchema, Object o) { + if (toSchema != null && toSchema.getType() == Type.UNION) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + org.apache.avro.io.BinaryEncoder avroEncoder = EncoderFactory.get().binaryEncoder(baos, null); + int unionIndex = 0; + try { + if (o == null) { + unionIndex = firstNullSchemaTypeIndex(toSchema); + avroEncoder.writeIndex(unionIndex); + avroEncoder.writeNull(); + } else { + unionIndex = firstNotNullSchemaTypeIndex(toSchema); + avroEncoder.writeIndex(unionIndex); + avroEncoder.writeBytes(toBytes(o)); + } + avroEncoder.flush(); + return baos.toByteArray(); + } catch (IOException e) { + LOG.error(e.getMessage()); + return toBytes(o); + } + } else { + return toBytes(o); + } + } + + private int firstNullSchemaTypeIndex(Schema toSchema) { + List<Schema> possibleTypes = toSchema.getTypes(); + int unionIndex = 0; + for (int i = 0; i < possibleTypes.size(); i++ ) { + Type pType = possibleTypes.get(i).getType(); + if (pType == Type.NULL) { // FIXME HUGE kludge to pass tests + unionIndex = i; break; + } + } + return unionIndex; + } + + private int firstNotNullSchemaTypeIndex(Schema toSchema) { + List<Schema> possibleTypes = toSchema.getTypes(); + int unionIndex = 0; + for (int i = 0; i < possibleTypes.size(); i++ ) { + Type pType = possibleTypes.get(i).getType(); + if (pType != Type.NULL) { // FIXME HUGE kludge to pass tests + unionIndex = i; break; + } + } + return unionIndex; + } + + public byte[] toBytes(Object o) { + return toBytes(encoder, o); + } + + public static byte[] toBytes(Encoder encoder, Object o) { + + try { + if (o instanceof String) { + return ((String) o).getBytes("UTF-8"); + } else if (o instanceof Utf8) { + return copyIfNeeded(((Utf8) o).getBytes(), 0, ((Utf8) o).getByteLength()); + } else if (o instanceof ByteBuffer) { + return copyIfNeeded(((ByteBuffer) o).array(), ((ByteBuffer) o).arrayOffset() + ((ByteBuffer) o).position(), ((ByteBuffer) o).remaining()); + } else if (o instanceof Long) { + return encoder.encodeLong((Long) o); + } else if (o instanceof Integer) { + return encoder.encodeInt((Integer) o); + } else if (o instanceof Short) { + return encoder.encodeShort((Short) o); + } else if (o instanceof Byte) { + return encoder.encodeByte((Byte) o); + } else if (o instanceof Boolean) { + return encoder.encodeBoolean((Boolean) o); + } else if (o instanceof Float) { + return encoder.encodeFloat((Float) o); + } else if (o instanceof Double) { + return encoder.encodeDouble((Double) o); + } else if (o instanceof Enum) { + return encoder.encodeInt(((Enum<?>) o).ordinal()); + } + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + + throw new IllegalArgumentException(UNKOWN + o.getClass().getName()); + } + + private BatchWriter getBatchWriter() throws IOException { + if (batchWriter == null) + try { + BatchWriterConfig batchWriterConfig = new BatchWriterConfig(); + batchWriterConfig.setMaxMemory(10000000); + batchWriterConfig.setMaxLatency(60000L, TimeUnit.MILLISECONDS); + batchWriterConfig.setMaxWriteThreads(4); + batchWriter = conn.createBatchWriter(mapping.tableName, batchWriterConfig); + } catch (TableNotFoundException e) { + throw new IOException(e); + } + return batchWriter; + } + + /** + * Initialize the data store by reading the credentials, setting the client's properties up and + * reading the mapping file. Initialize is called when then the call to + * {@link org.apache.gora.store.DataStoreFactory#createDataStore} is made. + * + * @param keyClass + * @param persistentClass + * @param properties + */ + @Override + public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) throws GoraException { + super.initialize(keyClass, persistentClass, properties); + + try { + + String mock = DataStoreFactory.findProperty(properties, this, MOCK_PROPERTY, null); + String mappingFile = DataStoreFactory.getMappingFile(properties, this, DEFAULT_MAPPING_FILE); + String user = DataStoreFactory.findProperty(properties, this, USERNAME_PROPERTY, null); + String password = DataStoreFactory.findProperty(properties, this, PASSWORD_PROPERTY, null); + + mapping = readMapping(mappingFile); + + if (mapping.encoder == null || "".equals(mapping.encoder)) { + encoder = new BinaryEncoder(); + } else { + encoder = (Encoder) getClass().getClassLoader().loadClass(mapping.encoder).newInstance(); + } + + AuthenticationToken token = new PasswordToken(password); + if (mock == null || !mock.equals("true")) { + String instance = DataStoreFactory.findProperty(properties, this, INSTANCE_NAME_PROPERTY, null); + String zookeepers = DataStoreFactory.findProperty(properties, this, ZOOKEEPERS_NAME_PROPERTY, null); + conn = new ZooKeeperInstance(instance, zookeepers).getConnector(user, token); + } else { + conn = new MockInstance().getConnector(user, token); + } + credentials = new Credentials(user, token); + + if (autoCreateSchema && !schemaExists()) + createSchema(); + + } catch (IOException | InstantiationException | IllegalAccessException | + ClassNotFoundException | IgniteException | IgniteSecurityException e) { + throw new GoraException(e); + } + } + + protected IgniteMapping readMapping(String filename) throws IOException { + try { + + IgniteMapping mapping = new IgniteMapping(); + + DocumentBuilder db = DocumentBuilderFactory.newInstance().newDocumentBuilder(); + Document dom = db.parse(getClass().getClassLoader().getResourceAsStream(filename)); + + Element root = dom.getDocumentElement(); + + NodeList nl = root.getElementsByTagName("class"); + for (int i = 0; i < nl.getLength(); i++) { + + Element classElement = (Element) nl.item(i); + if (classElement.getAttribute("keyClass").equals(keyClass.getCanonicalName()) + && classElement.getAttribute("name").equals(persistentClass.getCanonicalName())) { + + mapping.tableName = getSchemaName(classElement.getAttribute("table"), persistentClass); + mapping.encoder = classElement.getAttribute("encoder"); + + NodeList fields = classElement.getElementsByTagName("field"); + for (int j = 0; j < fields.getLength(); j++) { + Element fieldElement = (Element) fields.item(j); + + String name = fieldElement.getAttribute("name"); + String family = fieldElement.getAttribute("family"); + String qualifier = fieldElement.getAttribute("qualifier"); + if ("".equals(qualifier)) + qualifier = null; + + Pair<Text,Text> col = new Pair<>(new Text(family), qualifier == null ? null : new Text(qualifier)); + mapping.fieldMap.put(name, col); + mapping.columnMap.put(col, name); + } + } + + } + + if (mapping.tableName == null) { + throw new GoraException("Please define the ignite 'table' name mapping in " + filename + " for " + persistentClass.getCanonicalName()); + } + + nl = root.getElementsByTagName("table"); + for (int i = 0; i < nl.getLength(); i++) { + Element tableElement = (Element) nl.item(i); + if (tableElement.getAttribute("name").equals(mapping.tableName)) { + NodeList configs = tableElement.getElementsByTagName("config"); + for (int j = 0; j < configs.getLength(); j++) { + Element configElement = (Element) configs.item(j); + String key = configElement.getAttribute("key"); + String val = configElement.getAttribute("value"); + mapping.tableConfig.put(key, val); + } + } + } + + return mapping; + } catch (Exception ex) { + throw new IOException("Unable to read " + filename, ex); + } + + } + + @Override + public String getSchemaName() { + return mapping.tableName; + } + + @Override + public void createSchema() throws GoraException { + try { + conn.tableOperations().create(mapping.tableName); + Set<Entry<String,String>> es = mapping.tableConfig.entrySet(); + for (Entry<String,String> entry : es) { + conn.tableOperations().setProperty(mapping.tableName, entry.getKey(), entry.getValue()); + } + + } catch (TableExistsException e) { + LOG.debug(e.getMessage(), e); + // Assume this is not an error + } catch (IgniteException | IgniteSecurityException e) { + throw new GoraException(e); + } + } + + @Override + public void deleteSchema() throws GoraException { + try { + if (batchWriter != null) + batchWriter.close(); + batchWriter = null; + conn.tableOperations().delete(mapping.tableName); + } catch (TableNotFoundException e) { + // Ignore. Delete a non existant schema is a success + } catch (IgniteException | IgniteSecurityException e) { + throw new GoraException(e); + } + } + + @Override + public boolean schemaExists() throws GoraException { + try { + return conn.tableOperations().exists(mapping.tableName); + } catch (Exception e) { + throw new GoraException(e); + } + } + + public ByteSequence populate(Iterator<Entry<Key,Value>> iter, T persistent) throws IOException { + ByteSequence row = null; + + Map<Utf8, Object> currentMap = null; + List currentArray = null; + Text currentFam = null; + int currentPos = 0; + Schema currentSchema = null; + Field currentField = null; + + BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(new byte[0], null); + + while (iter.hasNext()) { + Entry<Key,Value> entry = iter.next(); + + if (row == null) { + row = entry.getKey().getRowData(); + } + byte[] val = entry.getValue().get(); + + Field field = fieldMap.get(getFieldName(entry)); + + if (currentMap != null) { + if (currentFam.equals(entry.getKey().getColumnFamily())) { + currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()), + fromBytes(currentSchema, entry.getValue().get())); + continue; + } else { + persistent.put(currentPos, currentMap); + currentMap = null; + } + } else if (currentArray != null) { + if (currentFam.equals(entry.getKey().getColumnFamily())) { + currentArray.add(fromBytes(currentSchema, entry.getValue().get())); + continue; + } else { + persistent.put(currentPos, new GenericData.Array<T>(currentField.schema(), currentArray)); + currentArray = null; + } + } + + switch (field.schema().getType()) { + case MAP: // first entry only. Next are handled above on the next loop + currentMap = new DirtyMapWrapper<>(new HashMap<Utf8, Object>()); + currentPos = field.pos(); + currentFam = entry.getKey().getColumnFamily(); + currentSchema = field.schema().getValueType(); + + currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()), + fromBytes(currentSchema, entry.getValue().get())); + break; + case ARRAY: + currentArray = new DirtyListWrapper<>(new ArrayList<>()); + currentPos = field.pos(); + currentFam = entry.getKey().getColumnFamily(); + currentSchema = field.schema().getElementType(); + currentField = field; + + currentArray.add(fromBytes(currentSchema, entry.getValue().get())); + + break; + case UNION:// default value of null acts like union with null + Schema effectiveSchema = field.schema().getTypes() + .get(firstNotNullSchemaTypeIndex(field.schema())); + // map and array were coded without union index so need to be read the same way + if (effectiveSchema.getType() == Type.ARRAY) { + currentArray = new DirtyListWrapper<>(new ArrayList<>()); + currentPos = field.pos(); + currentFam = entry.getKey().getColumnFamily(); + currentSchema = field.schema().getElementType(); + currentField = field; + + currentArray.add(fromBytes(currentSchema, entry.getValue().get())); + break; + } + else if (effectiveSchema.getType() == Type.MAP) { + currentMap = new DirtyMapWrapper<>(new HashMap<Utf8, Object>()); + currentPos = field.pos(); + currentFam = entry.getKey().getColumnFamily(); + currentSchema = effectiveSchema.getValueType(); + + currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()), + fromBytes(currentSchema, entry.getValue().get())); + break; + } + // continue like a regular top-level union + case RECORD: + SpecificDatumReader<?> reader = new SpecificDatumReader<Schema>(field.schema()); + persistent.put(field.pos(), reader.read(null, DecoderFactory.get().binaryDecoder(val, decoder))); + break; + default: + persistent.put(field.pos(), fromBytes(field.schema(), entry.getValue().get())); + } + } + + if (currentMap != null) { + persistent.put(currentPos, currentMap); + } else if (currentArray != null) { + persistent.put(currentPos, new GenericData.Array<T>(currentField.schema(), currentArray)); + } + + persistent.clearDirty(); + + return row; + } + + /** + * Retrieve field name from entry. + * @param entry The Key-Value entry + * @return String The field name + */ + private String getFieldName(Entry<Key, Value> entry) { + String fieldName = mapping.columnMap.get(new Pair<>(entry.getKey().getColumnFamily(), + entry.getKey().getColumnQualifier())); + if (fieldName == null) { + fieldName = mapping.columnMap.get(new Pair<Text,Text>(entry.getKey().getColumnFamily(), null)); + } + return fieldName; + } + + private void setFetchColumns(Scanner scanner, String[] fields) { + fields = getFieldsToQuery(fields); + for (String field : fields) { + Pair<Text,Text> col = mapping.fieldMap.get(field); + if (col != null) { + if (col.getSecond() == null) { + scanner.fetchColumnFamily(col.getFirst()); + } else { + scanner.fetchColumn(col.getFirst(), col.getSecond()); + } + } else { + LOG.error("Mapping not found for field: {}", field); + } + } + } + + @Override + public T get(K key, String[] fields) throws GoraException { + try { + // TODO make isolated scanner optional? + Scanner scanner = new IsolatedScanner(conn.createScanner(mapping.tableName, Authorizations.EMPTY)); + Range rowRange = new Range(new Text(toBytes(key))); + + scanner.setRange(rowRange); + setFetchColumns(scanner, fields); + + T persistent = newPersistent(); + ByteSequence row = populate(scanner.iterator(), persistent); + if (row == null) + return null; + return persistent; + } catch (Exception e) { + throw new GoraException(e); + } + } + + @Override + public void put(K key, T val) throws GoraException { + + try{ + Mutation m = new Mutation(new Text(toBytes(key))); + + Schema schema = val.getSchema(); + List<Field> fields = schema.getFields(); + int count = 0; + + for (int i = 0; i < fields.size(); i++) { + if (!val.isDirty(i)) { + continue; + } + Field field = fields.get(i); + + Object o = val.get(field.pos()); + + Pair<Text,Text> col = mapping.fieldMap.get(field.name()); + + if (col == null) { + throw new GoraException("Please define the gora to ignite mapping for field " + field.name()); + } + + switch (field.schema().getType()) { + case MAP: + count = putMap(m, count, field.schema().getValueType(), o, col, field.name()); + break; + case ARRAY: + count = putArray(m, count, o, col, field.name()); + break; + case UNION: // default value of null acts like union with null + Schema effectiveSchema = field.schema().getTypes() + .get(firstNotNullSchemaTypeIndex(field.schema())); + // map and array need to compute qualifier + if (effectiveSchema.getType() == Type.ARRAY) { + count = putArray(m, count, o, col, field.name()); + break; + } + else if (effectiveSchema.getType() == Type.MAP) { + count = putMap(m, count, effectiveSchema.getValueType(), o, col, field.name()); + break; + } + // continue like a regular top-level union + case RECORD: + final SpecificDatumWriter<Object> writer = new SpecificDatumWriter<>(field.schema()); + final byte[] byteData = IOUtils.serialize(writer,o); + m.put(col.getFirst(), col.getSecond(), new Value(byteData)); + count++; + break; + default: + m.put(col.getFirst(), col.getSecond(), new Value(toBytes(o))); + count++; + } + + } + + if (count > 0) + try { + getBatchWriter().addMutation(m); + } catch (MutationsRejectedException e) { + LOG.error(e.getMessage(), e); + } + } catch (GoraException e) { + throw e; + } catch (Exception e) { + throw new GoraException(e); + } + } + + private int putMap(Mutation m, int count, Schema valueType, Object o, Pair<Text, Text> col, String fieldName) throws GoraException { + + // First of all we delete map field on ignite store + Text rowKey = new Text(m.getRow()); + Query<K, T> query = newQuery(); + query.setFields(fieldName); + query.setStartKey((K)rowKey.toString()); + query.setEndKey((K)rowKey.toString()); + deleteByQuery(query); + flush(); + if (o == null){ + return 0; + } + + Set<?> es = ((Map<?, ?>)o).entrySet(); + for (Object entry : es) { + Object mapKey = ((Entry<?, ?>) entry).getKey(); + Object mapVal = ((Entry<?, ?>) entry).getValue(); + if ((o instanceof DirtyMapWrapper && ((DirtyMapWrapper<?, ?>)o).isDirty()) + || !(o instanceof DirtyMapWrapper)) { + m.put(col.getFirst(), new Text(toBytes(mapKey)), new Value(toBytes(valueType, mapVal))); + count++; + } + // TODO map value deletion + } + return count; + } + + private int putArray(Mutation m, int count, Object o, Pair<Text, Text> col, String fieldName) throws GoraException { + + // First of all we delete array field on ignite store + Text rowKey = new Text(m.getRow()); + Query<K, T> query = newQuery(); + query.setFields(fieldName); + query.setStartKey((K)rowKey.toString()); + query.setEndKey((K)rowKey.toString()); + deleteByQuery(query); + flush(); + if (o == null){ + return 0; + } + + List<?> array = (List<?>) o; // both GenericArray and DirtyListWrapper + int j = 0; + for (Object item : array) { + m.put(col.getFirst(), new Text(toBytes(j++)), new Value(toBytes(item))); + count++; + } + return count; + } + + @Override + public boolean delete(K key) throws GoraException { + Query<K,T> q = newQuery(); + q.setKey(key); + return deleteByQuery(q) > 0; + } + + @Override + public long deleteByQuery(Query<K,T> query) throws GoraException { + try { + Scanner scanner = createScanner(query); + // add iterator that drops values on the server side + scanner.addScanIterator(new IteratorSetting(Integer.MAX_VALUE, SortedKeyIterator.class)); + RowIterator iterator = new RowIterator(scanner.iterator()); + + long count = 0; + + while (iterator.hasNext()) { + Iterator<Entry<Key,Value>> row = iterator.next(); + Mutation m = null; + while (row.hasNext()) { + Entry<Key,Value> entry = row.next(); + Key key = entry.getKey(); + if (m == null) + m = new Mutation(key.getRow()); + // TODO optimize to avoid continually creating column vis? prob does not matter for empty + m.putDelete(key.getColumnFamily(), key.getColumnQualifier(), new ColumnVisibility(key.getColumnVisibility()), key.getTimestamp()); + } + getBatchWriter().addMutation(m); + count++; + } + + return count; + } catch (Exception e) { + throw new GoraException(e); + } + } + + private Range createRange(Query<K,T> query) { + Text startRow = null; + Text endRow = null; + + if (query.getStartKey() != null) + startRow = new Text(toBytes(query.getStartKey())); + + if (query.getEndKey() != null) + endRow = new Text(toBytes(query.getEndKey())); + + return new Range(startRow, true, endRow, true); + + } + + private Scanner createScanner(Query<K,T> query) throws TableNotFoundException { + // TODO make isolated scanner optional? + Scanner scanner = new IsolatedScanner(conn.createScanner(mapping.tableName, Authorizations.EMPTY)); + setFetchColumns(scanner, query.getFields()); + + scanner.setRange(createRange(query)); + + if (query.getStartTime() != -1 || query.getEndTime() != -1) { + IteratorSetting is = new IteratorSetting(30, TimestampFilter.class); + if (query.getStartTime() != -1) + TimestampFilter.setStart(is, query.getStartTime(), true); + if (query.getEndTime() != -1) + TimestampFilter.setEnd(is, query.getEndTime(), true); + + scanner.addScanIterator(is); + } + + return scanner; + } + + /** + * Execute the query and return the result. + */ + @Override + public Result<K,T> execute(Query<K,T> query) throws GoraException { + try { + Scanner scanner = createScanner(query); + return new IgniteResult<>(this, query, scanner); + } catch (TableNotFoundException e) { + throw new GoraException(e) ; + } + } + + @Override + public Query<K,T> newQuery() { + return new IgniteQuery<>(this); + } + + Text pad(Text key, int bytes) { + if (key.getLength() < bytes) + key = new Text(key); + + while (key.getLength() < bytes) { + key.append(new byte[] {0}, 0, 1); + } + + return key; + } + + @Override + public List<PartitionQuery<K,T>> getPartitions(Query<K,T> query) throws GoraException { + try { + TabletLocator tl; + if (conn instanceof MockConnector) + tl = new MockTabletLocator(); + else + tl = TabletLocator.getLocator(new ClientContext(conn.getInstance(), credentials, IgniteConfiguration.getTableConfiguration(conn, Tables.getTableId(conn.getInstance(), mapping.tableName))), new Text(Tables.getTableId(conn.getInstance(), mapping.tableName))); + + Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>(); + + tl.invalidateCache(); + while (tl.binRanges(new ClientContext(conn.getInstance(), credentials, IgniteConfiguration.getTableConfiguration(conn, Tables.getTableId(conn.getInstance(), mapping.tableName))), Collections.singletonList(createRange(query)), binnedRanges).size() > 0) { + // TODO log? + if (!Tables.exists(conn.getInstance(), Tables.getTableId(conn.getInstance(), mapping.tableName))) + throw new TableDeletedException(Tables.getTableId(conn.getInstance(), mapping.tableName)); + else if (Tables.getTableState(conn.getInstance(), Tables.getTableId(conn.getInstance(), mapping.tableName)) == TableState.OFFLINE) + throw new TableOfflineException(conn.getInstance(), Tables.getTableId(conn.getInstance(), mapping.tableName)); + UtilWaitThread.sleep(100); + tl.invalidateCache(); + } + + List<PartitionQuery<K,T>> ret = new ArrayList<>(); + + Text startRow = null; + Text endRow = null; + if (query.getStartKey() != null) + startRow = new Text(toBytes(query.getStartKey())); + if (query.getEndKey() != null) + endRow = new Text(toBytes(query.getEndKey())); + + //hadoop expects hostnames, ignite keeps track of IPs... so need to convert + HashMap<String,String> hostNameCache = new HashMap<>(); + + for (Entry<String,Map<KeyExtent,List<Range>>> entry : binnedRanges.entrySet()) { + String ip = entry.getKey().split(":", 2)[0]; + String location = hostNameCache.get(ip); + if (location == null) { + InetAddress inetAddress = InetAddress.getByName(ip); + location = inetAddress.getHostName(); + hostNameCache.put(ip, location); + } + + Map<KeyExtent,List<Range>> tablets = entry.getValue(); + for (KeyExtent ke : tablets.keySet()) { + + K startKey = null; + if (startRow == null || !ke.contains(startRow)) { + if (ke.getPrevEndRow() != null) { + startKey = followingKey(encoder, getKeyClass(), getBytes(ke.getPrevEndRow())); + } + } else { + startKey = fromBytes(getKeyClass(), getBytes(startRow)); + } + + K endKey = null; + if (endRow == null || !ke.contains(endRow)) { + if (ke.getEndRow() != null) + endKey = lastPossibleKey(encoder, getKeyClass(), getBytes(ke.getEndRow())); + } else { + endKey = fromBytes(getKeyClass(), getBytes(endRow)); + } + + PartitionQueryImpl<K, T> pqi = new PartitionQueryImpl<>(query, startKey, endKey, location); + pqi.setConf(getConf()); + ret.add(pqi); + } + } + + return ret; + } catch (Exception e) { + throw new GoraException(e); + } + + } + + static <K> K lastPossibleKey(Encoder encoder, Class<K> clazz, byte[] er) { + + if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) { + throw new UnsupportedOperationException(); + } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) { + throw new UnsupportedOperationException(); + } else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) { + return fromBytes(encoder, clazz, encoder.lastPossibleKey(2, er)); + } else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) { + return fromBytes(encoder, clazz, encoder.lastPossibleKey(4, er)); + } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) { + return fromBytes(encoder, clazz, encoder.lastPossibleKey(8, er)); + } else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) { + return fromBytes(encoder, clazz, encoder.lastPossibleKey(4, er)); + } else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) { + return fromBytes(encoder, clazz, encoder.lastPossibleKey(8, er)); + } else if (clazz.equals(String.class)) { + throw new UnsupportedOperationException(); + } else if (clazz.equals(Utf8.class)) { + return fromBytes(encoder, clazz, er); + } + + throw new IllegalArgumentException(UNKOWN + clazz.getName()); + } + + @SuppressWarnings("unchecked") + static <K> K followingKey(Encoder encoder, Class<K> clazz, byte[] per) { + + if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) { + return (K) Byte.valueOf(encoder.followingKey(1, per)[0]); + } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) { + throw new UnsupportedOperationException(); + } else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) { + return fromBytes(encoder, clazz, encoder.followingKey(2, per)); + } else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) { + return fromBytes(encoder, clazz, encoder.followingKey(4, per)); + } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) { + return fromBytes(encoder, clazz, encoder.followingKey(8, per)); + } else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) { + return fromBytes(encoder, clazz, encoder.followingKey(4, per)); + } else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) { + return fromBytes(encoder, clazz, encoder.followingKey(8, per)); + } else if (clazz.equals(String.class)) { + throw new UnsupportedOperationException(); + } else if (clazz.equals(Utf8.class)) { + return fromBytes(encoder, clazz, Arrays.copyOf(per, per.length + 1)); + } + + throw new IllegalArgumentException(UNKOWN + clazz.getName()); + } + + @Override + public void flush() throws GoraException { + try { + if (batchWriter != null) { + batchWriter.flush(); + } + } catch (Exception e) { + throw new GoraException(e); + } + } + + @Override + public void close() { + try { + if (batchWriter != null) { + batchWriter.close(); + batchWriter = null; + } + } catch (MutationsRejectedException e) { + LOG.error(e.getMessage(), e); + } + } +} http://git-wip-us.apache.org/repos/asf/gora/blob/4346216f/gora-ignite/src/main/java/org/apache/gora/ignite/store/package-info.java ---------------------------------------------------------------------- diff --git a/gora-ignite/src/main/java/org/apache/gora/ignite/store/package-info.java b/gora-ignite/src/main/java/org/apache/gora/ignite/store/package-info.java new file mode 100644 index 0000000..62cba67 --- /dev/null +++ b/gora-ignite/src/main/java/org/apache/gora/ignite/store/package-info.java @@ -0,0 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains all the Ignite store related classes. + */ +package org.apache.gora.ignite.store; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/4346216f/gora-ignite/src/main/java/org/apache/gora/ignite/util/FixedByteArrayOutputStream.java ---------------------------------------------------------------------- diff --git a/gora-ignite/src/main/java/org/apache/gora/ignite/util/FixedByteArrayOutputStream.java b/gora-ignite/src/main/java/org/apache/gora/ignite/util/FixedByteArrayOutputStream.java new file mode 100644 index 0000000..97fb46a --- /dev/null +++ b/gora-ignite/src/main/java/org/apache/gora/ignite/util/FixedByteArrayOutputStream.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.ignite.util; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * It is a implementation of {@link java.io.OutputStream} must always provide at least a method that writes one byte of output. + */ +public class FixedByteArrayOutputStream extends OutputStream { + + private int i; + byte out[]; + + public FixedByteArrayOutputStream(byte out[]) { + this.out = out; + } + + @Override + public void write(int b) throws IOException { + out[i++] = (byte) b; + } + + @Override + public void write(byte b[], int off, int len) throws IOException { + System.arraycopy(b, off, out, i, len); + i += len; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/4346216f/gora-ignite/src/main/java/org/apache/gora/ignite/util/package-info.java ---------------------------------------------------------------------- diff --git a/gora-ignite/src/main/java/org/apache/gora/ignite/util/package-info.java b/gora-ignite/src/main/java/org/apache/gora/ignite/util/package-info.java new file mode 100644 index 0000000..eedb84b --- /dev/null +++ b/gora-ignite/src/main/java/org/apache/gora/ignite/util/package-info.java @@ -0,0 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains Ignite store related util classes. + */ +package org.apache.gora.ignite.util; \ No newline at end of file
