Repository: gora Updated Branches: refs/heads/master 73250856a -> e8ed25deb
GORA-479 Various compilation errors in gora-infinispan module Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/e8ed25de Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/e8ed25de Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/e8ed25de Branch: refs/heads/master Commit: e8ed25deb75b4e9506bc6ac6121d4e7e7e5feaf6 Parents: 7325085 Author: Lewis John McGibbney <[email protected]> Authored: Wed Jun 1 10:13:55 2016 -0700 Committer: Lewis John McGibbney <[email protected]> Committed: Wed Jun 1 10:13:55 2016 -0700 ---------------------------------------------------------------------- .../org/apache/gora/hbase/store/HBaseStore.java | 5 +- gora-infinispan/pom.xml | 35 +- .../gora/infinispan/store/InfinispanClient.java | 2 +- .../gora/infinispan/store/InfinispanStore.java | 479 ++++++++++--------- 4 files changed, 275 insertions(+), 246 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/e8ed25de/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java ---------------------------------------------------------------------- diff --git a/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java b/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java index 0e07e15..00fe60b 100644 --- a/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java +++ b/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java @@ -100,7 +100,10 @@ implements Configurable { private int scannerCaching = SCANNER_CACHING_PROPERTIES_DEFAULT ; - public HBaseStore() { + /** + * Default constructor + */ + public HBaseStore() {//Empty Constrctor } @Override http://git-wip-us.apache.org/repos/asf/gora/blob/e8ed25de/gora-infinispan/pom.xml ---------------------------------------------------------------------- diff --git a/gora-infinispan/pom.xml b/gora-infinispan/pom.xml index a8307a3..f101f76 100644 --- a/gora-infinispan/pom.xml +++ b/gora-infinispan/pom.xml @@ -29,15 +29,27 @@ <name>Apache Gora :: Infinispan</name> <url>http://gora.apache.org</url> - <description> - The Apache Gora open source framework provides an in-memory data model and - persistence for big data. Gora supports persisting to column stores, key value stores, - document stores and RDBMSs, and analyzing the data with extensive Apache Hadoop MapReduce - support. This module provides a Gora support for the Infinispan storage system. - </description> - <inceptionYear>2015</inceptionYear> + <description>The Apache Gora open source framework provides an in-memory data model and + persistence for big data. Gora supports persisting to column stores, key value stores, + document stores and RDBMSs, and analyzing the data with extensive Apache Hadoop MapReduce + support.</description> + <inceptionYear>2010</inceptionYear> + <organization> + <name>The Apache Software Foundation</name> + <url>http://www.apache.org/</url> + </organization> + <issueManagement> + <system>JIRA</system> + <url>https://issues.apache.org/jira/browse/GORA</url> + </issueManagement> + <ciManagement> + <system>Jenkins</system> + <url>https://builds.apache.org/job/Gora-trunk/</url> + </ciManagement> <properties> + <osgi.import>*</osgi.import> + <osgi.export>org.apache.gora.infinispan*;version="${project.version}";-noimport:=true</osgi.export> <infinispan.version>7.2.5.Final</infinispan.version> <infinispan.avro.version>1.0</infinispan.avro.version> <infinispan.avro.server.client.version>1.0.Final</infinispan.avro.server.client.version> @@ -109,6 +121,15 @@ <dependency> <groupId>org.apache.gora</groupId> <artifactId>gora-core</artifactId> + <scope>compile</scope> + </dependency> + <!-- End of internal dependencies --> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <scope>compile</scope> + <optional>true</optional> </dependency> <!-- Infinispan Dependencies --> http://git-wip-us.apache.org/repos/asf/gora/blob/e8ed25de/gora-infinispan/src/main/java/org/apache/gora/infinispan/store/InfinispanClient.java ---------------------------------------------------------------------- diff --git a/gora-infinispan/src/main/java/org/apache/gora/infinispan/store/InfinispanClient.java b/gora-infinispan/src/main/java/org/apache/gora/infinispan/store/InfinispanClient.java index ccd45e3..ab0a64b 100644 --- a/gora-infinispan/src/main/java/org/apache/gora/infinispan/store/InfinispanClient.java +++ b/gora-infinispan/src/main/java/org/apache/gora/infinispan/store/InfinispanClient.java @@ -75,7 +75,7 @@ public class InfinispanClient<K, T extends PersistentBase> implements Configurab properties.setProperty(ISPN_CONNECTION_STRING_KEY, host); LOG.info("Connecting client to "+host); - Marshaller<T> marshaller = new Marshaller<T>(persistentClass); + Marshaller<T> marshaller = new Marshaller<>(persistentClass); ConfigurationBuilder builder = new ConfigurationBuilder(); builder.addServers(host); builder.marshaller(marshaller); http://git-wip-us.apache.org/repos/asf/gora/blob/e8ed25de/gora-infinispan/src/main/java/org/apache/gora/infinispan/store/InfinispanStore.java ---------------------------------------------------------------------- diff --git a/gora-infinispan/src/main/java/org/apache/gora/infinispan/store/InfinispanStore.java b/gora-infinispan/src/main/java/org/apache/gora/infinispan/store/InfinispanStore.java index 488e477..cb198ff 100644 --- a/gora-infinispan/src/main/java/org/apache/gora/infinispan/store/InfinispanStore.java +++ b/gora-infinispan/src/main/java/org/apache/gora/infinispan/store/InfinispanStore.java @@ -47,251 +47,256 @@ import static org.apache.gora.mapreduce.GoraRecordReader.BUFFER_LIMIT_READ_VALUE */ public class InfinispanStore<K, T extends PersistentBase> extends DataStoreBase<K, T> { - public static final Logger LOG = LoggerFactory.getLogger(InfinispanStore.class); + public static final Logger LOG = LoggerFactory.getLogger(InfinispanStore.class); - private InfinispanClient<K, T> infinispanClient; - private String primaryFieldName; - private int primaryFieldPos; - private int splitSize; + private InfinispanClient<K, T> infinispanClient; + private String primaryFieldName; + private int primaryFieldPos; + private int splitSize; - public InfinispanStore() throws Exception {} + /** + * Default constructor + */ + public InfinispanStore(){ + //Empty default constructor + } - @Override - public synchronized void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) { + @Override + public synchronized void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) { - try { + try { - if (primaryFieldName!=null) { - LOG.info("Client already initialized; ignoring."); - return; - } - - super.initialize(keyClass, persistentClass, properties); - infinispanClient = new InfinispanClient<>(); - infinispanClient.setConf(conf); - - LOG.info("key class: " - + keyClass.getCanonicalName() - + ", persistent class: " - + persistentClass.getCanonicalName()); - schema = persistentClass.newInstance().getSchema(); - - splitSize = Integer.valueOf( - properties.getProperty( BUFFER_LIMIT_READ_NAME, - getConf().get( - BUFFER_LIMIT_READ_NAME, - Integer.toString(BUFFER_LIMIT_READ_VALUE)))); - LOG.info("split size: "+splitSize); - - primaryFieldPos = 0; - primaryFieldName = schema.getFields().get(0).name(); - this.infinispanClient.initialize(keyClass, persistentClass, properties); - - } catch (Exception e) { - throw new RuntimeException(e); + if (primaryFieldName!=null) { + LOG.info("Client already initialized; ignoring."); + return; } - } - - @Override - public void close() { - LOG.debug("close()"); - infinispanClient.close(); - } - - @Override - public void createSchema() { - LOG.debug("createSchema()"); - this.infinispanClient.createCache(); - } - - @Override - public boolean delete(K key) { - LOG.debug("delete(" + key+")"); - this.infinispanClient.deleteByKey(key); - return true; - } - - @Override - public long deleteByQuery(Query<K, T> query) { - ((InfinispanQuery<K, T>) query).build(); - LOG.debug("deleteByQuery("+query.toString()+")"); - InfinispanQuery<K, T> q = (InfinispanQuery) query; - q.build(); - for( T t : q.list()){ - infinispanClient.deleteByKey((K) t.get(primaryFieldPos)); - } - return q.getResultSize(); - } - - @Override - public void deleteSchema() { - LOG.debug("deleteSchema()"); - this.infinispanClient.dropCache(); - } - - @Override - public Result<K, T> execute(Query<K, T> query) { - LOG.debug("execute()"); - ((InfinispanQuery<K,T>)query).build(); - InfinispanResult<K,T> result = new InfinispanResult<>(this, (InfinispanQuery<K,T>)query); - LOG.trace("query: " + query.toString()); - LOG.trace("result size: " + result.size()); - return result; - } - - @Override - public T get(K key){ - LOG.debug("get("+key+")"); + + super.initialize(keyClass, persistentClass, properties); + infinispanClient = new InfinispanClient<>(); + infinispanClient.setConf(conf); + + LOG.info("key class: " + + keyClass.getCanonicalName() + + ", persistent class: " + + persistentClass.getCanonicalName()); + schema = persistentClass.newInstance().getSchema(); + + splitSize = Integer.valueOf( + properties.getProperty( BUFFER_LIMIT_READ_NAME, + getConf().get( + BUFFER_LIMIT_READ_NAME, + Integer.toString(BUFFER_LIMIT_READ_VALUE)))); + LOG.info("split size: "+splitSize); + + primaryFieldPos = 0; + primaryFieldName = schema.getFields().get(0).name(); + this.infinispanClient.initialize(keyClass, persistentClass, properties); + + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + LOG.debug("close()"); + infinispanClient.close(); + } + + @Override + public void createSchema() { + LOG.debug("createSchema()"); + this.infinispanClient.createCache(); + } + + @Override + public boolean delete(K key) { + LOG.debug("delete(" + key+")"); + this.infinispanClient.deleteByKey(key); + return true; + } + + @Override + public long deleteByQuery(Query<K, T> query) { + ((InfinispanQuery<K, T>) query).build(); + LOG.debug("deleteByQuery("+query.toString()+")"); + InfinispanQuery<K, T> q = (InfinispanQuery) query; + q.build(); + for( T t : q.list()){ + infinispanClient.deleteByKey((K) t.get(primaryFieldPos)); + } + return q.getResultSize(); + } + + @Override + public void deleteSchema() { + LOG.debug("deleteSchema()"); + this.infinispanClient.dropCache(); + } + + @Override + public Result<K, T> execute(Query<K, T> query) { + LOG.debug("execute()"); + ((InfinispanQuery<K,T>)query).build(); + InfinispanResult<K,T> result = new InfinispanResult<>(this, (InfinispanQuery<K,T>)query); + LOG.trace("query: " + query.toString()); + LOG.trace("result size: " + result.size()); + return result; + } + + @Override + public T get(K key){ + LOG.debug("get("+key+")"); + return infinispanClient.get(key); + } + + @Override + public T get(K key, String[] fields) { + LOG.debug("get("+key+","+fields+")"); + if (fields==null) return infinispanClient.get(key); - } - - @Override - public T get(K key, String[] fields) { - LOG.debug("get("+key+","+fields+")"); - if (fields==null) - return infinispanClient.get(key); - - InfinispanQuery query = new InfinispanQuery(this); - query.setKey(key); - query.setFields(fields); - query.build(); - - - Result<K,T> result = query.execute(); - try { - result.next(); - return result.get(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - /** - * - * Split the query per infinispan node resulting in a list of queries. - * For each Infinispan server, this function returns a set of qeuries - * using pagination of the originial query. The size of each query - * in this pagination equals <i>gora.buffer.read.limit</i>. - * - * @param query the base query to create the partitions for. If the query - * is null, then the data store returns the partitions for the default query - * (returning every object) - * @return - * @throws IOException - */ - @Override - public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) - throws IOException { - LOG.debug("getPartitions()"); - - // 1 - split the query per location - List<PartitionQuery<K,T>> locations = ((InfinispanQuery<K,T>)query).split(); - - // 2 -split each location - List<PartitionQuery<K,T>> splitLocations = new ArrayList<>(); - for(PartitionQuery<K,T> location : locations) { - - LOG.trace("location: "+ ((InfinispanQuery)location).getLocation().toString()); - - // 2.1 - compute the result size - InfinispanQuery<K,T> sizeQuery = (InfinispanQuery<K, T>) ((InfinispanQuery<K, T>) location).clone(); - sizeQuery.setFields(primaryFieldName); - sizeQuery.setLimit(1); - sizeQuery.rebuild(); - - // 2.2 - check if splitting is necessary - int resultSize = sizeQuery.getResultSize(); - long queryLimit = query.getLimit(); - long splitLimit = queryLimit>0 ? Math.min((long)resultSize,queryLimit) : resultSize; - LOG.trace("split limit: "+ splitLimit); - LOG.trace("split size: "+ splitSize); - if (splitLimit <= splitSize) { - LOG.trace("location returned"); - splitLocations.add(location); - continue; - } - - // 2.3 - compute the splits - for(int i=0; i<Math.ceil((double)splitLimit/(double)splitSize); i++) { - InfinispanQuery<K, T> split = (InfinispanQuery<K, T>) ((InfinispanQuery<K, T>) location).clone(); - split.setOffset(i * splitSize); - split.setLimit(splitSize); - split.rebuild(); - splitLocations.add(split); - } + + InfinispanQuery<K, T> query = new InfinispanQuery<K, T>(this); + query.setKey(key); + query.setFields(fields); + query.build(); + + + Result<K,T> result = query.execute(); + try { + result.next(); + return result.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * + * Split the query per infinispan node resulting in a list of queries. + * For each Infinispan server, this function returns a set of qeuries + * using pagination of the originial query. The size of each query + * in this pagination equals <i>gora.buffer.read.limit</i>. + * + * @param query the base query to create the partitions for. If the query + * is null, then the data store returns the partitions for the default query + * (returning every object) + * @return + * @throws IOException + */ + @Override + public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) + throws IOException { + LOG.debug("getPartitions()"); + + // 1 - split the query per location + List<PartitionQuery<K,T>> locations = ((InfinispanQuery<K,T>)query).split(); + + // 2 -split each location + List<PartitionQuery<K,T>> splitLocations = new ArrayList<>(); + for(PartitionQuery<K,T> location : locations) { + + LOG.trace("location: "+ ((InfinispanQuery<K, T>)location).getLocation().toString()); + + // 2.1 - compute the result size + InfinispanQuery<K,T> sizeQuery = (InfinispanQuery<K, T>) ((InfinispanQuery<K, T>) location).clone(); + sizeQuery.setFields(primaryFieldName); + sizeQuery.setLimit(1); + sizeQuery.rebuild(); + + // 2.2 - check if splitting is necessary + int resultSize = sizeQuery.getResultSize(); + long queryLimit = query.getLimit(); + long splitLimit = queryLimit>0 ? Math.min((long)resultSize,queryLimit) : resultSize; + LOG.trace("split limit: "+ splitLimit); + LOG.trace("split size: "+ splitSize); + if (splitLimit <= splitSize) { + LOG.trace("location returned"); + splitLocations.add(location); + continue; } - return splitLocations; - } - - @Override - public void flush() { - LOG.debug("flush()"); - infinispanClient.flush(); - } - - /** - * In Infinispan, Schemas are referred to as caches. - * - * @return Cache - */ - @Override - public String getSchemaName() { - LOG.debug("getSchemaName()"); - return this.infinispanClient.getCacheName(); - } - - @Override - public Query<K, T> newQuery() { - LOG.debug("newQuery()"); - Query<K, T> query = new InfinispanQuery<K, T>(this); - query.setFields(getFieldsToQuery(null)); - return query; - } - - @Override - public void put(K key, T obj) { - LOG.debug("put(" +key.toString()+")"); - LOG.trace(obj.toString()); - - if (obj.get(primaryFieldPos)==null) - obj.put(primaryFieldPos,key); - - if (!obj.get(primaryFieldPos).equals(key) ) - LOG.warn("Invalid or different primary field :"+key+"<->"+obj.get(primaryFieldPos)); - - this.infinispanClient.put(key, obj); - } - - @Override - public boolean schemaExists() { - LOG.debug("schemaExists()"); - return infinispanClient.cacheExists(); - } - - public InfinispanClient<K, T> getClient() { - LOG.debug("getClient()"); - return infinispanClient; - } - - public String getPrimaryFieldName() { - LOG.debug("getPrimaryField()"); - return primaryFieldName; - } - - public void setPrimaryFieldName(String name){ - LOG.debug("getPrimaryFieldName()"); - primaryFieldName = name; - } - - public int getPrimaryFieldPos(){ - LOG.debug("getPrimaryFieldPos()"); - return primaryFieldPos; - } - - public void setPrimaryFieldPos(int p){ - LOG.debug("setPrimaryFieldPos()"); - primaryFieldPos = p; - } + // 2.3 - compute the splits + for(int i=0; i<Math.ceil((double)splitLimit/(double)splitSize); i++) { + InfinispanQuery<K, T> split = (InfinispanQuery<K, T>) ((InfinispanQuery<K, T>) location).clone(); + split.setOffset(i * splitSize); + split.setLimit(splitSize); + split.rebuild(); + splitLocations.add(split); + } + } + + return splitLocations; + } + + @Override + public void flush() { + LOG.debug("flush()"); + infinispanClient.flush(); + } + + /** + * In Infinispan, Schemas are referred to as caches. + * + * @return Cache + */ + @Override + public String getSchemaName() { + LOG.debug("getSchemaName()"); + return this.infinispanClient.getCacheName(); + } + + @Override + public Query<K, T> newQuery() { + LOG.debug("newQuery()"); + Query<K, T> query = new InfinispanQuery<>(this); + query.setFields(getFieldsToQuery(null)); + return query; + } + + @Override + public void put(K key, T obj) { + LOG.debug("put(" +key.toString()+")"); + LOG.trace(obj.toString()); + + if (obj.get(primaryFieldPos)==null) + obj.put(primaryFieldPos,key); + + if (!obj.get(primaryFieldPos).equals(key) ) + LOG.warn("Invalid or different primary field :"+key+"<->"+obj.get(primaryFieldPos)); + + this.infinispanClient.put(key, obj); + } + + @Override + public boolean schemaExists() { + LOG.debug("schemaExists()"); + return infinispanClient.cacheExists(); + } + + public InfinispanClient<K, T> getClient() { + LOG.debug("getClient()"); + return infinispanClient; + } + + public String getPrimaryFieldName() { + LOG.debug("getPrimaryField()"); + return primaryFieldName; + } + + public void setPrimaryFieldName(String name){ + LOG.debug("getPrimaryFieldName()"); + primaryFieldName = name; + } + + public int getPrimaryFieldPos(){ + LOG.debug("getPrimaryFieldPos()"); + return primaryFieldPos; + } + + public void setPrimaryFieldPos(int p){ + LOG.debug("setPrimaryFieldPos()"); + primaryFieldPos = p; + } }
