Repository: incubator-apex-malhar Updated Branches: refs/heads/master eaf74392c -> cec33da88
APEXMALHAR-1942 #comment Added GeodePOJOOutput operator, GeodeStore implementation & unit tests Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/cec33da8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/cec33da8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/cec33da8 Branch: refs/heads/master Commit: cec33da888a33e60bd62221c3c34e0dc4523d9f1 Parents: eaf7439 Author: prasi-in <[email protected]> Authored: Tue Feb 9 01:40:25 2016 +0530 Committer: prasi-in <[email protected]> Committed: Tue Feb 9 01:40:25 2016 +0530 ---------------------------------------------------------------------- contrib/pom.xml | 8 +- .../geode/AbstractGeodeInputOperator.java | 43 +++ .../geode/AbstractGeodeOutputOperator.java | 44 +++ .../contrib/geode/GeodePOJOOutputOperator.java | 70 +++++ .../datatorrent/contrib/geode/GeodeStore.java | 298 +++++++++++++++++++ .../contrib/geode/GeodeOperatorTest.java | 69 +++++ .../contrib/geode/GeodePOJOOperatorTest.java | 102 +++++++ .../contrib/geode/GeodeStoreTest.java | 110 +++++++ 8 files changed, 743 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cec33da8/contrib/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/pom.xml b/contrib/pom.xml index 17b6008..6145fac 100755 --- a/contrib/pom.xml +++ b/contrib/pom.xml @@ -616,6 +616,12 @@ <artifactId>apex-common</artifactId> <version>${apex.core.version}</version> <type>jar</type> - </dependency> + </dependency> + <dependency> + <groupId>org.apache.geode</groupId> + <artifactId>gemfire-core</artifactId> + <version>1.0.0-incubating.M1</version> + <optional>true</optional> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cec33da8/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeInputOperator.java new file mode 100644 index 0000000..7595e1a --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeInputOperator.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.geode; + +import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator; + +/** + * This is the base implementation used for geode input adapters. A + * concrete operator should be created from this skeleton implementation. + * <p> + * </p> + * + * @displayName Abstract Geode Input + * @category Input + * @tags geode, key value + * + * @param <T> + * The tuple type. + * + */ +public abstract class AbstractGeodeInputOperator<T> extends AbstractKeyValueStoreInputOperator<T, GeodeStore> +{ + public AbstractGeodeInputOperator() + { + store = new GeodeStore(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cec33da8/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeOutputOperator.java new file mode 100644 index 0000000..157fbf4 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeOutputOperator.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.geode; + +import com.datatorrent.lib.db.AbstractStoreOutputOperator; + +/** + * This is the base implementation of geode output operators. A concrete + * operator should be created from this skeleton implementation. + * <p> + * </p> + * + * @displayName Abstract Geode Output + * @category Output + * @tags geode, key value + * + * @param <T> + * The tuple type. + * + */ +public abstract class AbstractGeodeOutputOperator<T> extends AbstractStoreOutputOperator<T, GeodeStore> +{ + public AbstractGeodeOutputOperator() + { + store = new GeodeStore(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cec33da8/contrib/src/main/java/com/datatorrent/contrib/geode/GeodePOJOOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/geode/GeodePOJOOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/geode/GeodePOJOOutputOperator.java new file mode 100644 index 0000000..ad7e90c --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/geode/GeodePOJOOutputOperator.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.geode; + +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.datatorrent.lib.util.FieldInfo; +import com.datatorrent.lib.util.PojoUtils; +import com.datatorrent.lib.util.PojoUtils.Getter; +import com.datatorrent.lib.util.TableInfo; + +/** + * + * @displayName Geode Output Operator + * @category Output + * @tags pojo, geode + * + */ +@Evolving +public class GeodePOJOOutputOperator extends AbstractGeodeOutputOperator<Object> +{ + + private TableInfo<FieldInfo> tableInfo; + private transient Getter<Object, String> rowGetter; + + @Override + public void processTuple(Object tuple) + { + if (rowGetter == null) { + rowGetter = PojoUtils.createGetter(tuple.getClass(), tableInfo.getRowOrIdExpression(), String.class); + } + + getStore().put(rowGetter.get(tuple), tuple); + } + + /** + * + * the information to convert pojo + */ + public TableInfo<FieldInfo> getTableInfo() + { + return tableInfo; + } + + /** + * + * the information to convert pojo + */ + public void setTableInfo(TableInfo<FieldInfo> tableInfo) + { + this.tableInfo = tableInfo; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cec33da8/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java b/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java new file mode 100644 index 0000000..14cb5a5 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java @@ -0,0 +1,298 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.geode; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.accumulo.core.client.impl.thrift.ThriftTest.Processor.throwsError; + +import com.gemstone.gemfire.cache.CacheClosedException; +import com.gemstone.gemfire.cache.CacheWriterException; +import com.gemstone.gemfire.cache.EntryNotFoundException; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.TimeoutException; +import com.gemstone.gemfire.cache.client.ClientCache; +import com.gemstone.gemfire.cache.client.ClientCacheFactory; +import com.gemstone.gemfire.cache.client.ClientRegionShortcut; +import com.gemstone.gemfire.cache.query.FunctionDomainException; +import com.gemstone.gemfire.cache.query.NameResolutionException; +import com.gemstone.gemfire.cache.query.QueryInvocationTargetException; +import com.gemstone.gemfire.cache.query.SelectResults; +import com.gemstone.gemfire.cache.query.TypeMismatchException; + +import com.datatorrent.lib.db.KeyValueStore; + +/** + * Provides the implementation of a Geode store. + * Geode is a distributed in-memory database + * that provides reliable asynchronous event notifications and guaranteed message delivery. + * Geode is a data management platform that provides real-time + * , consistent access to data-intensive applications. + * + */ +public class GeodeStore implements KeyValueStore, Serializable +{ + /** + * + */ + private static final long serialVersionUID = -5076452548893319967L; + private static final Logger logger = LoggerFactory.getLogger(GeodeStore.class); + private transient ClientCache clientCache = null; + private transient Region<Object, Object> region = null; + private String locatorHost; + private int locatorPort; + private String regionName; + + private ClientCache initClient() + { + try { + clientCache = new ClientCacheFactory().addPoolLocator(getLocatorHost(), getLocatorPort()).create(); + } catch (CacheClosedException ex) { + throw new RuntimeException("Exception while creating cache", ex); + + } + + return clientCache; + } + + /** + * @return the regionName + */ + public String getRegionName() + { + return regionName; + } + + + /** + * @return the clientCache + */ + public ClientCache getClientCache() + { + return clientCache; + } + + /** + * @return the locatorPort + */ + public int getLocatorPort() + { + return locatorPort; + } + + /** + * @param locatorPort + * the locatorPort to set + */ + public void setLocatorPort(int locatorPort) + { + this.locatorPort = locatorPort; + } + + /** + * @return the locatorHost + */ + public String getLocatorHost() + { + return locatorHost; + } + + /** + * @param locatorHost + * the locatorHost to set + */ + public void setLocatorHost(String locatorHost) + { + this.locatorHost = locatorHost; + } + + /** + * @return the region + * @throws IOException + */ + public Region<Object, Object> getRegion() throws IOException + { + // return region; + if (clientCache == null || clientCache.isClosed()) { + initClient(); + } + + if (region == null) { + region = clientCache.getRegion(regionName); + if (region == null) { + region = clientCache.<Object, Object> createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName); + } + } + + return region; + } + + @Override + public void connect() throws IOException + { + try { + clientCache = new ClientCacheFactory().addPoolLocator(getLocatorHost(), getLocatorPort()).create(); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + + region = clientCache.getRegion(getRegionName()); + + if (region == null) { + region = clientCache.<Object, Object> createClientRegionFactory(ClientRegionShortcut.PROXY).create( + getRegionName()); + } + + } + + @Override + public void disconnect() throws IOException + { + clientCache.close(); + + } + + @Override + public boolean isConnected() + { + return (clientCache.isClosed()); + + } + + /** + * Gets the value given the key. Note that it does NOT work with hash values + * or list values + * + * @param key + * @return The value. + */ + @Override + public Object get(Object key) + { + + try { + return (getRegion().get(key)); + } catch (IOException ex) { + throw new RuntimeException("Exception while getting the object", ex); + + } + + } + + /** + * Gets all the values given the keys. Note that it does NOT work with hash + * values or list values + * + * @param keys + * @return All values for the given keys. + */ + @SuppressWarnings("unchecked") + @Override + public List<Object> getAll(List<Object> keys) + { + + List<Object> values = new ArrayList<Object>(); + + try { + final Map<Object, Object> entries = getRegion().getAll(keys); + for (int i = 0; i < keys.size(); i++) { + values.add(entries.get(keys.get(i))); + } + } catch (IOException ex) { + logger.info("error getting region ", ex); + } + + return (values); + + } + + /** + * @param regionName + * the regionName to set + */ + public void setRegionName(String regionName) + { + this.regionName = regionName; + } + + + public Map<Object, Object> getAllMap(List<Object> keys) + { + + try { + final Map<Object, Object> entries = getRegion().getAll(keys); + return (entries); + } catch (IOException ex) { + logger.info("error getting object ", ex); + return null; + } + + } + + @SuppressWarnings("rawtypes") + public SelectResults query(String predicate) + { + try { + return (getRegion().query(predicate)); + } catch (FunctionDomainException | TypeMismatchException | NameResolutionException | QueryInvocationTargetException + | IOException e) { + logger.info("error in querying object ", e); + return null; + } + + } + + @Override + public void put(Object key, Object value) + { + try { + getRegion().put(key, value); + } catch (IOException e) { + logger.info("while putting in region", e); + } + } + + @Override + public void putAll(Map<Object, Object> map) + { + try { + getRegion().putAll(map); + } catch (IOException e) { + logger.info("while putting all in region", e); + } + } + + @Override + public void remove(Object key) + { + try { + getRegion().destroy(key); + } catch (TimeoutException | CacheWriterException | EntryNotFoundException | IOException e) { + logger.info("while deleting", e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cec33da8/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeOperatorTest.java new file mode 100644 index 0000000..5f06391 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeOperatorTest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.geode; + +import org.junit.Before; +import org.junit.Test; + +import com.datatorrent.lib.db.KeyValueStoreOperatorTest; + +public class GeodeOperatorTest +{ + KeyValueStoreOperatorTest<GeodeStore> testFramework; + + private GeodeStore geodeStore; + private GeodeStore testStore; + + @Before + public void setup() + { + + geodeStore = new GeodeStore(); + testStore = new GeodeStore(); + + geodeStore.setLocatorHost("192.168.1.128"); + geodeStore.setLocatorPort(50505); + geodeStore.setRegionName("operator"); + + testStore.setLocatorHost("192.168.1.128"); + testStore.setLocatorPort(50505); + testStore.setRegionName("operator"); + + if (System.getProperty("dev.locator.connection") != null) { + geodeStore.setLocatorHost(System.getProperty("dev.locator.connection")); + testStore.setLocatorHost(System.getProperty("dev.locator.connection")); + } + testFramework = new KeyValueStoreOperatorTest<GeodeStore>(geodeStore, testStore); + + } + + @Test + public void testOutputOperator() throws Exception + { + testFramework.testOutputOperator(); + } + + @Test + public void testInputOperator() throws Exception + { + testFramework.testInputOperator(); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cec33da8/contrib/src/test/java/com/datatorrent/contrib/geode/GeodePOJOOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/geode/GeodePOJOOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/geode/GeodePOJOOperatorTest.java new file mode 100644 index 0000000..0cacc02 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/geode/GeodePOJOOperatorTest.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.geode; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.datatorrent.contrib.util.TestPOJO; +import com.datatorrent.contrib.util.TupleGenerator; +import com.datatorrent.lib.util.TableInfo; +import com.datatorrent.netlet.util.DTThrowable; + +@SuppressWarnings("rawtypes") +public class GeodePOJOOperatorTest +{ + public static final int TUPLE_SIZE = 10; + + private GeodeStore store; + + @Before + public void setup() + { + store = new GeodeStore(); + store.setLocatorHost("192.168.1.128"); + if (System.getProperty("dev.locator.connection") != null) { + store.setLocatorHost(System.getProperty("dev.locator.connection")); + } + store.setLocatorPort(10334); + store.setRegionName("operator5"); + } + + public void cleanup() + { + if (store != null) { + try { + store.disconnect(); + } catch (Exception e) { + DTThrowable.rethrow(e); + } + } + + } + + @SuppressWarnings("unchecked") + @Test + public void testGeodeOutputOperatorInternal() throws Exception + { + GeodePOJOOutputOperator operator = new GeodePOJOOutputOperator(); + operator.setStore(store); + + TableInfo tableInfo = new TableInfo(); + tableInfo.setRowOrIdExpression(TestPOJO.getRowExpression()); + tableInfo.setFieldsInfo(TestPOJO.getFieldsInfo()); + tableInfo.setRowOrIdExpression(TestPOJO.getRowExpression()); + operator.setTableInfo(tableInfo); + + operator.setup(null); + + TupleGenerator<TestPOJO> generator = new TupleGenerator<TestPOJO>(TestPOJO.class); + + for (int i = 0; i < TUPLE_SIZE; ++i) { + operator.processTuple(generator.getNextTuple()); + } + + generator.reset(); + + for (int i = 0; i < TUPLE_SIZE; ++i) { + operator.processTuple(generator.getNextTuple()); + } + + // readDataAndVerify(operator.getStore(), generator); + } + + public void readDataAndVerify(GeodeStore store, TupleGenerator<TestPOJO> generator) + { + generator.reset(); + + for (int i = 0; i < TUPLE_SIZE; ++i) { + TestPOJO expected = generator.getNextTuple(); + TestPOJO read = (TestPOJO)store.get(expected.getRow()); + Assert.assertTrue(String.format("expected={%s}, actually={%s}", expected.toString(), read.toString()), + expected.completeEquals(read)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cec33da8/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeStoreTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeStoreTest.java b/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeStoreTest.java new file mode 100644 index 0000000..cad6876 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeStoreTest.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.geode; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.gemstone.gemfire.cache.query.SelectResults; + +public class GeodeStoreTest +{ + + private GeodeStore geodeStore; + + @Before + public void setup() throws IOException + { + geodeStore = new GeodeStore(); + geodeStore.setLocatorHost("192.168.1.128"); + if (System.getProperty("dev.locator.connection") != null) { + geodeStore.setLocatorHost(System.getProperty("dev.locator.connection")); + } + geodeStore.setLocatorPort(10334); + geodeStore.setRegionName("operator"); + geodeStore.connect(); + } + + @After + public void tearDown() throws IOException + { + geodeStore.disconnect(); + } + + @Test + public void testputAllandget() throws Exception + { + + Map<Object, Object> m = new HashMap<Object, Object>(); + m.put("test1_abc", "123"); + m.put("test1_def", "456"); + + geodeStore.putAll(m); + + Assert.assertEquals("123", geodeStore.get("test1_abc")); + Assert.assertEquals("456", geodeStore.get("test1_def")); + } + + @Test + public void testQuery() throws Exception + { + Map<Object, Object> m = new HashMap<Object, Object>(); + m.put("test2_abc", "123"); + m.put("test2_def", "456"); + geodeStore.putAll(m); + String predicate = "Select key,value from /operator.entries where key like 'test2%'"; + SelectResults results = geodeStore.query(predicate); + Assert.assertEquals(2, results.size()); + } + + @Test + public void testputAllandgetAll() throws Exception + { + + Map<Object, Object> m = new HashMap<Object, Object>(); + m.put("test3_abc", "123"); + m.put("test3_def", "456"); + geodeStore.putAll(m); + + List<Object> keys = new ArrayList<Object>(); + keys.add("test3_abc"); + keys.add("test3_def"); + Map<Object, Object> values = geodeStore.getAllMap(keys); + + Assert.assertEquals("123", values.get("test3_abc")); + Assert.assertEquals("456", values.get("test3_def")); + + } + + @Test + public void testputandget() throws Exception + { + geodeStore.put("test4_abc", "123"); + Assert.assertEquals("123", geodeStore.get("test4_abc")); + } + +}
