Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3 9c557fca1 -> 377190a7d
MLHR-1938 #comment Concrete implementation of In memory storage agent for Apache Geode Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/377190a7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/377190a7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/377190a7 Branch: refs/heads/devel-3 Commit: 377190a7d948930fb483b6f034c0784745e0d364 Parents: 9c557fc Author: Ashish <[email protected]> Authored: Sun Feb 21 20:10:54 2016 +0530 Committer: Ashish <[email protected]> Committed: Tue Feb 23 00:51:31 2016 +0530 ---------------------------------------------------------------------- .../contrib/geode/GeodeCheckpointStore.java | 327 +++++++++++++++++++ .../geode/GeodeKeyValueStorageAgent.java | 64 ++++ .../contrib/geode/RegionCreateFunction.java | 80 +++++ .../contrib/geode/GeodeCheckpointStoreTest.java | 153 +++++++++ .../geode/GeodeKeyValueStorageAgentTest.java | 214 ++++++++++++ .../lib/util/AbstractKeyValueStorageAgent.java | 233 +++++++++++++ .../com/datatorrent/lib/util/StorageAgent.java | 98 ++++++ .../lib/util/StorageAgentKeyValueStore.java | 48 +++ 8 files changed, 1217 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/377190a7/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeCheckpointStore.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeCheckpointStore.java b/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeCheckpointStore.java new file mode 100644 index 0000000..61113a9 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeCheckpointStore.java @@ -0,0 +1,327 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.geode; + +import com.datatorrent.lib.util.StorageAgentKeyValueStore; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.client.ClientCache; +import com.gemstone.gemfire.cache.client.ClientCacheFactory; +import com.gemstone.gemfire.cache.client.ClientRegionShortcut; +import com.gemstone.gemfire.cache.execute.Execution; +import com.gemstone.gemfire.cache.execute.FunctionService; +import com.gemstone.gemfire.cache.query.Query; +import com.gemstone.gemfire.cache.query.QueryService; +import com.gemstone.gemfire.cache.query.SelectResults; +import com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +/** + * Geode Store implementation of {@link StorageAgentKeyValueStore} Uses {@link Kryo} + * serialization to store retrieve objects + * + * + */ +public class GeodeCheckpointStore + implements StorageAgentKeyValueStore, Serializable +{ + + public static final String GET_KEYS_QUERY = + "SELECT entry.key FROM /$[region}.entries entry WHERE entry.key LIKE '${operator.id}%'"; + + private String geodeLocators; + private String geodeRegionName; + + public String getGeodeRegionName() + { + return geodeRegionName; + } + + public void setGeodeRegionName(String geodeRegionName) + { + this.geodeRegionName = geodeRegionName; + } + + protected transient Kryo kryo; + + public GeodeCheckpointStore() + { + geodeLocators = null; + kryo = null; + } + + /** + * Initializes Geode store by using locator connection string + * + * @param locatorString + */ + public GeodeCheckpointStore(String locatorString) + { + this.geodeLocators = locatorString; + kryo = new Kryo(); + } + + private Kryo getKyro() + { + if (kryo == null) { + kryo = new Kryo(); + } + return kryo; + } + + /** + * Get the Geode locator connection string + * + * @return locator connection string + */ + public String getGeodeLocators() + { + return geodeLocators; + } + + /** + * Sets the Geode locator string + * + * @param geodeLocators + */ + public void setGeodeLocators(String geodeLocators) + { + this.geodeLocators = geodeLocators; + } + + private transient ClientCache clientCache = null; + private transient Region<String, byte[]> region = null; + + /** + * Connect the Geode store by initializing Geode Client Cache + */ + @Override + public void connect() throws IOException + { + ClientCacheFactory factory = new ClientCacheFactory(); + Map<String, String> locators = parseLocatorString(geodeLocators); + + if (locators.size() == 0) { + throw new IllegalArgumentException("Invalid locator connection string " + geodeLocators); + } else { + for (Entry<String, String> entry : locators.entrySet()) { + factory.addPoolLocator(entry.getKey(), Integer.valueOf(entry.getValue())); + } + } + clientCache = factory.create(); + } + + private Region<String, byte[]> getGeodeRegion() throws IOException + { + if (clientCache == null) { + this.connect(); + } + if (region == null) { + region = clientCache.getRegion(geodeRegionName); + if (region == null) { + createRegion(); + region = clientCache.<String, byte[]>createClientRegionFactory(ClientRegionShortcut.PROXY) + .create(geodeRegionName); + } + } + + return region; + } + + /** + * Creates a region + * + */ + public synchronized void createRegion() + { + RegionCreateFunction atcf = new RegionCreateFunction(); + java.util.List<Object> inputList = new java.util.ArrayList<Object>(); + + inputList.add(geodeRegionName); + inputList.add(true); + + Execution members = FunctionService.onServers(clientCache.getDefaultPool()).withArgs(inputList); + members.execute(atcf.getId()).getResult(); + } + + /** + * Disconnect the connection to Geode store by closing Client Cache connection + */ + @Override + public void disconnect() throws IOException + { + clientCache.close(); + } + + /** + * Check if store is connected to configured Geode cluster or not + * + * @return True is connected to Geode cluster and client cache is active + */ + @Override + public boolean isConnected() + { + if (clientCache == null) { + return false; + } + return !clientCache.isClosed(); + } + + /** + * Return the value for specified key from Geode region + * + * @return the value object + */ + @Override + public Object get(Object key) + { + + try { + byte[] obj = getGeodeRegion().get((String)key); + if (obj == null) { + return null; + } + + ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(obj); + getKyro().setClassLoader(Thread.currentThread().getContextClassLoader()); + Input input = new Input(byteArrayInputStream); + return getKyro().readClassAndObject(input); + + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Put given given key & value in Geode region + */ + @Override + public void put(Object key, Object value) + { + try { + Output output = new Output(4096, Integer.MAX_VALUE); + getKyro().writeClassAndObject(output, value); + getGeodeRegion().put((String)key, output.getBuffer()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Removed the record associated for specified key from Geode region + */ + @Override + public void remove(Object key) + { + try { + getGeodeRegion().destroy((String)key); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Get list for keys starting from provided key name + * + * @return List of keys + */ + @Override + public List<String> getKeys(Object key) + { + List<String> keys = null; + try { + keys = queryIds((int)(key)); + return keys; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public List<String> queryIds(int operatorId) throws IOException + { + List<String> ids = new ArrayList<>(); + try { + QueryService queryService = clientCache.getQueryService(); + Query query = queryService.newQuery( + GET_KEYS_QUERY.replace("$[region}", geodeRegionName).replace("${operator.id}", String.valueOf(operatorId))); + logger.debug("executing query {} ", query.getQueryString()); + + SelectResults results = (SelectResults)query.execute(); + for (Iterator iterator = results.iterator(); iterator.hasNext();) { + ids.add(String.valueOf(iterator.next())); + } + + } catch (Exception e) { + throw new RuntimeException(e); + } + return ids; + } + + /** + * Sets the region name for this Store instance to connect to + */ + @Override + public void setTableName(String tableName) + { + this.geodeRegionName = tableName; + } + + private Map<String, String> parseLocatorString(String locatorConnString) + { + Map<String, String> locators = Maps.newHashMap(); + for (String locator : locatorConnString.split(",")) { + String[] parts = locator.split(":"); + if (parts.length > 1 && !parts[0].isEmpty() && parts[0] != "" && !parts[1].isEmpty() && parts[1] != "") { + locators.put(parts[0], parts[1]); + } else { + throw new IllegalArgumentException("Wrong locator connection string : " + locatorConnString + "\n" + + "Expected format locator1:locator1_port,locator2:locator2_port"); + } + } + return locators; + } + + private static final long serialVersionUID = 8897644407674960149L; + private static final Logger logger = LoggerFactory.getLogger(GeodeCheckpointStore.class); + + + @Override + public List<Object> getAll(List<Object> keys) + { + return null; + } + + @Override + public void putAll(Map<Object, Object> m) + { + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/377190a7/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeKeyValueStorageAgent.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeKeyValueStorageAgent.java b/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeKeyValueStorageAgent.java new file mode 100644 index 0000000..750f7d8 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeKeyValueStorageAgent.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.geode; + +import java.io.Serializable; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.lib.util.AbstractKeyValueStorageAgent; + +/** + * Storage Agent implementation which uses {@link GeodeCheckpointStore} for operator + * checkpointing + * + * + */ +public class GeodeKeyValueStorageAgent extends AbstractKeyValueStorageAgent<GeodeCheckpointStore> implements Serializable +{ + + /** + * Geode locator connection string which needs to be provided by application + * developer/admin Format - locator1:locator1_port,locator2:locator2_port + */ + public static final String GEODE_LOCATOR_STRING = "dt.checkpoint.agent.geode.connection"; + + public GeodeKeyValueStorageAgent() + { + setStore(new GeodeCheckpointStore()); + } + + public GeodeKeyValueStorageAgent(Configuration conf) + { + setStore(new GeodeCheckpointStore(conf.get(GEODE_LOCATOR_STRING))); + } + + /** + * Saves yarn application id which can be used by Key value store to create + * application specific Geode region + */ + @Override + public void setApplicationId(String applicationId) + { + store.setTableName(applicationId); + super.setApplicationId(applicationId); + } + + private static final long serialVersionUID = -8838327680202565778L; +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/377190a7/contrib/src/main/java/com/datatorrent/contrib/geode/RegionCreateFunction.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/geode/RegionCreateFunction.java b/contrib/src/main/java/com/datatorrent/contrib/geode/RegionCreateFunction.java new file mode 100644 index 0000000..5637953 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/geode/RegionCreateFunction.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.geode; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.CacheClosedException; +import com.gemstone.gemfire.cache.CacheFactory; +import com.gemstone.gemfire.cache.Declarable; +import com.gemstone.gemfire.cache.RegionExistsException; +import com.gemstone.gemfire.cache.RegionShortcut; +import com.gemstone.gemfire.cache.execute.FunctionAdapter; +import com.gemstone.gemfire.cache.execute.FunctionContext; + +/** + * Function to create region dynamically through client API + * + */ +public class RegionCreateFunction extends FunctionAdapter implements Declarable +{ + + @Override + public void init(Properties arg0) + { + } + + /** + * Create a region in Geode cluster + */ + @Override + public void execute(FunctionContext context) + { + List<Object> args = (List<Object>)context.getArguments(); + String regionName = (String)args.get(0); + + try { + Cache cache = CacheFactory.getAnyInstance(); + if (cache.getRegion(regionName) == null) { + + cache.createRegionFactory(RegionShortcut.PARTITION).create(regionName); + } + } catch (RegionExistsException re) { + context.getResultSender().lastResult(new ArrayList<Integer>()); + } catch (CacheClosedException e) { + context.getResultSender().lastResult(new ArrayList<Integer>()); + } + context.getResultSender().lastResult(new ArrayList<Integer>()); + } + + /** + * Return name to create Region + */ + @Override + public String getId() + { + return this.getClass().getName(); + } + + private static final long serialVersionUID = 2450085868879041729L; + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/377190a7/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeCheckpointStoreTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeCheckpointStoreTest.java b/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeCheckpointStoreTest.java new file mode 100644 index 0000000..5c59622 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeCheckpointStoreTest.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.geode; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.gemstone.gemfire.cache.query.FunctionDomainException; +import com.gemstone.gemfire.cache.query.NameResolutionException; +import com.gemstone.gemfire.cache.query.QueryInvocationTargetException; +import com.gemstone.gemfire.cache.query.TypeMismatchException; +import com.google.common.collect.Maps; + +public class GeodeCheckpointStoreTest +{ + + static String LOCATOR_HOST = "localhost:10334"; + static final String REGION_NAME = "apex-checkpoint-region"; + + static final String KEY1 = "key1"; + static final String KEY2 = "key2"; + static final String KEY3 = "key3"; + + static GeodeCheckpointStore store; + + @Before + public void setUp() throws Exception + { + if (System.getProperty("dev.locator.connection") != null) { + LOCATOR_HOST = System.getProperty("dev.locator.connection"); + } + + store = new GeodeCheckpointStore(LOCATOR_HOST); + store.setTableName(REGION_NAME); + store.connect(); + } + + @Test + public void testSave() throws IOException + { + Map<Integer, String> data = Maps.newHashMap(); + data.put(1, "one"); + data.put(2, "two"); + data.put(3, "three"); + store.put(KEY1, data); + @SuppressWarnings("unchecked") + Map<Integer, String> decoded = (Map<Integer, String>)store.get(KEY1); + Assert.assertEquals("dataOf1", data, decoded); + } + + @Test + public void testLoad() throws IOException + { + Map<Integer, String> dataOf1 = Maps.newHashMap(); + dataOf1.put(1, "one"); + dataOf1.put(2, "two"); + dataOf1.put(3, "three"); + + Map<Integer, String> dataOf2 = Maps.newHashMap(); + dataOf2.put(4, "four"); + dataOf2.put(5, "five"); + dataOf2.put(6, "six"); + + store.put(KEY1, dataOf1); + store.put(KEY2, dataOf2); + @SuppressWarnings("unchecked") + Map<Integer, String> decoded1 = (Map<Integer, String>)store.get(KEY1); + + @SuppressWarnings("unchecked") + Map<Integer, String> decoded2 = (Map<Integer, String>)store.get(KEY2); + Assert.assertEquals("data of 1", dataOf1, decoded1); + Assert.assertEquals("data of 2", dataOf2, decoded2); + } + + @Test + public void testDelete() throws IOException, FunctionDomainException, TypeMismatchException, NameResolutionException, + QueryInvocationTargetException + { + testLoad(); + + store.remove(KEY1); + Assert.assertTrue("operator 2 window 1", (store.get(KEY2) != null)); + Assert.assertFalse("operator 1 window 1", (store.get(KEY1) != null)); + } + + //@Test + public void testGetWindowIds() throws IOException + { + store.disconnect(); + final String REGION_NAME = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss").format(new Date()); + store.setTableName(REGION_NAME); + store.connect(); + Map<Integer, String> obj = Maps.newHashMap(); + obj.put(1, "one"); + obj.put(2, "two"); + obj.put(3, "three"); + + List<String> op1WindowIds = new ArrayList<>(); + op1WindowIds.add("111"); + op1WindowIds.add("112"); + op1WindowIds.add("113"); + for (String l : op1WindowIds) { + store.put(l, obj); + } + + List<String> op2WindowIds = new ArrayList<>(); + op2WindowIds.add("211"); + op2WindowIds.add("212"); + for (String l : op2WindowIds) { + store.put(l, obj); + } + + List<String> op1WinIds = store.getKeys(1); + List<String> op2WinIds = store.getKeys(2); + + Assert.assertEquals(op1WindowIds.size(), op1WinIds.size()); + Assert.assertEquals(op2WindowIds.size(), op2WinIds.size()); + Assert.assertTrue(op1WindowIds.containsAll(op1WinIds)); + Assert.assertTrue(op2WindowIds.containsAll(op2WinIds)); + } + + @After + public void tearDown() throws Exception + { + store.disconnect(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/377190a7/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeKeyValueStorageAgentTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeKeyValueStorageAgentTest.java b/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeKeyValueStorageAgentTest.java new file mode 100644 index 0000000..74e5c1c --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeKeyValueStorageAgentTest.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.geode; + +import java.io.File; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.Date; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; + +import com.gemstone.gemfire.cache.query.FunctionDomainException; +import com.gemstone.gemfire.cache.query.NameResolutionException; +import com.gemstone.gemfire.cache.query.QueryInvocationTargetException; +import com.gemstone.gemfire.cache.query.TypeMismatchException; +import com.google.common.collect.Maps; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.DAG; + +/** + * Test setup instructions + * + * Setup a local Geode cluster by starting locator & server using Geode's Gfsh shell + * + * start Gfsh - gemfire-assembly/build/install/apache-geode/bin/gfsh + * + * gfsh>start locator - gfsh>start locator --name=L1 + * + * gfsh>start server - gfsh>start server --name=S1 + * + * Checkpointing storage agent needs to dynamically create Geode regions for per application. + * + * To be able Programmatically create Geode region deploy below Server function through Gfsh + * + * > jar -cvf geode-fun.jar com/datatorrent/contrib/geode/RegionCreateFunction.class + * + * gfsh> deploy --jar=/tmp/jars/geode-fun.jar + * + * gfsh> list functions // verify RegionCreateFunction is listed + * + * gfsh> describe member --name=L1 + * + * provide locators details from above command in LOCATOR_HOST as <>locator-host:<locator-io> + */ +public class GeodeKeyValueStorageAgentTest + +{ + private static class TestMeta extends TestWatcher + { + String applicationPath; + GeodeKeyValueStorageAgent storageAgent; + static String LOCATOR_HOST = "localhost:10334"; + static final String REGION_NAME = "GeodeKeyValueStorageAgentTest"; + + @Override + protected void starting(Description description) + { + super.starting(description); + + applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName(); + if (System.getProperty("dev.locator.connection") != null) { + LOCATOR_HOST = System.getProperty("dev.locator.connection"); + } + try { + FileUtils.forceMkdir(new File("target/" + description.getClassName())); + } catch (IOException e) { + throw new RuntimeException(e); + } + + Configuration config = new Configuration(); + config.set(GeodeKeyValueStorageAgent.GEODE_LOCATOR_STRING, LOCATOR_HOST); + + storageAgent = new GeodeKeyValueStorageAgent(config); + storageAgent.setApplicationId(REGION_NAME); + Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); + attributes.put(DAG.APPLICATION_PATH, applicationPath); + } + + @Override + protected void finished(Description description) + { + try { + storageAgent.getStore().disconnect(); + FileUtils.deleteDirectory(new File("target/" + description.getClassName())); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + @Rule + public TestMeta testMeta = new TestMeta(); + + @Test + public void testSave() throws IOException + { + Map<Integer, String> data = Maps.newHashMap(); + data.put(1, "one"); + data.put(2, "two"); + data.put(3, "three"); + testMeta.storageAgent.save(data, 1, 1); + @SuppressWarnings("unchecked") + Map<Integer, String> decoded = (Map<Integer, String>)testMeta.storageAgent.load(1, 1); + Assert.assertEquals("dataOf1", data, decoded); + } + + @Test + public void testLoad() throws IOException + { + Map<Integer, String> dataOf1 = Maps.newHashMap(); + dataOf1.put(1, "one"); + dataOf1.put(2, "two"); + dataOf1.put(3, "three"); + + Map<Integer, String> dataOf2 = Maps.newHashMap(); + dataOf2.put(4, "four"); + dataOf2.put(5, "five"); + dataOf2.put(6, "six"); + + testMeta.storageAgent.save(dataOf1, 1, 1); + testMeta.storageAgent.save(dataOf2, 2, 1); + @SuppressWarnings("unchecked") + Map<Integer, String> decoded1 = (Map<Integer, String>)testMeta.storageAgent.load(1, 1); + + @SuppressWarnings("unchecked") + Map<Integer, String> decoded2 = (Map<Integer, String>)testMeta.storageAgent.load(2, 1); + Assert.assertEquals("data of 1", dataOf1, decoded1); + Assert.assertEquals("data of 2", dataOf2, decoded2); + } + + @Test + public void testRecovery() throws IOException + { + testSave(); + Configuration config = new Configuration(); + config.set(GeodeKeyValueStorageAgent.GEODE_LOCATOR_STRING, testMeta.LOCATOR_HOST); + testMeta.storageAgent = new GeodeKeyValueStorageAgent(config); + testMeta.storageAgent.setApplicationId(testMeta.REGION_NAME); + testSave(); + } + + @Test + public void testDelete() throws IOException, FunctionDomainException, TypeMismatchException, NameResolutionException, + QueryInvocationTargetException + { + testLoad(); + + testMeta.storageAgent.delete(1, 1); + Assert.assertTrue("operator 2 window 1", (testMeta.storageAgent.load(2, 1) != null)); + Assert.assertFalse("operator 1 window 1", (testMeta.storageAgent.load(1, 1) != null)); + } + + //@Test + public void testGetWindowIds() throws IOException + { + final String REGION_NAME = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss").format(new Date()); + testMeta.storageAgent.setApplicationId(REGION_NAME); + Map<Integer, String> obj = Maps.newHashMap(); + obj.put(1, "one"); + obj.put(2, "two"); + obj.put(3, "three"); + + long[] op1WindowIds = {111, 112, 113}; + for (long l : op1WindowIds) { + testMeta.storageAgent.save(obj, 1, l); + } + + long[] op2WindowIds = {211, 212}; + for (long l : op2WindowIds) { + testMeta.storageAgent.save(obj, 2, l); + } + + Arrays.sort(op1WindowIds); + Arrays.sort(op2WindowIds); + long[] op1WinIds = testMeta.storageAgent.getWindowIds(1); + long[] op2WinIds = testMeta.storageAgent.getWindowIds(2); + Arrays.sort(op1WinIds); + Arrays.sort(op2WinIds); + + Assert.assertEquals(op1WindowIds.length, op1WinIds.length); + Assert.assertEquals(op2WindowIds.length, op2WinIds.length); + + Assert.assertTrue(Arrays.equals(op1WindowIds, op1WinIds)); + Assert.assertTrue(Arrays.equals(op2WindowIds, op2WinIds)); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/377190a7/library/src/main/java/com/datatorrent/lib/util/AbstractKeyValueStorageAgent.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/util/AbstractKeyValueStorageAgent.java b/library/src/main/java/com/datatorrent/lib/util/AbstractKeyValueStorageAgent.java new file mode 100644 index 0000000..520877b --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/util/AbstractKeyValueStorageAgent.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.util; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.Attribute.AttributeMap; +import com.datatorrent.api.Context.DAGContext; +import com.datatorrent.lib.util.StorageAgent.ApplicationAwareStorageAgent; + +/** + * Abstract implementation of {@link ApplicationAwareStorageAgent} which can be + * configured be KeyValue store witch implementation of {@link StorageAgentKeyValueStore} + * + * NOTE - this should be picked from APEX-CORE once below feature is release + * https://issues.apache.org/jira/browse/APEXCORE-283 + * + * @param <S> + * Store implementation + */ +public abstract class AbstractKeyValueStorageAgent<S extends StorageAgentKeyValueStore> + implements ApplicationAwareStorageAgent, Serializable +{ + + protected S store; + protected String applicationId; + public static final String CHECKPOINT_KEY_SEPARATOR = "-"; + + /** + * Gets the store + * + * @return the store + */ + public S getStore() + { + return store; + } + + /** + * Sets the store + * + * @param store + */ + public void setStore(S store) + { + this.store = store; + } + + /** + * Return yarn application id of running application + * + * @return + */ + public String getApplicationId() + { + return applicationId; + } + + /** + * Set yarn application id + * + * @param applicationId + */ + public void setApplicationId(String applicationId) + { + this.applicationId = applicationId; + } + + /** + * Generates key from operator id and window id to store unique operator + * checkpoints + * + * @param operatorId + * @param windowId + * @return unique key for store + */ + public static String generateKey(int operatorId, long windowId) + { + return String.valueOf(operatorId) + CHECKPOINT_KEY_SEPARATOR + String.valueOf(windowId); + } + + /** + * Stores the given operator object in configured store + * + * @param object + * Operator object to store + * @param operatorId + * of operator + * @param windowId + * window id of operator to checkpoint + * + */ + @Override + public void save(Object object, int operatorId, long windowId) throws IOException + { + + try { + store(generateKey(operatorId, windowId), object); + logger.debug("saved check point object key {} region {}", generateKey(operatorId, windowId), applicationId); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + private synchronized void store(String checkpointKey, Object operator) throws IOException + { + if (!getStore().isConnected()) { + getStore().connect(); + } + getStore().put(checkpointKey, operator); + } + + /** + * Retrieves the operator object for given operator & window from configured + * store + * + * @param operatorId + * of operator + * @param windowId + * window id of operator to checkpoint + */ + @Override + public Object load(int operatorId, long windowId) + { + Object obj = null; + try { + obj = retrieve(generateKey(operatorId, windowId)); + logger.debug("retrieved object from store key {} region {} ", generateKey(operatorId, windowId), applicationId); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + + return obj; + } + + private synchronized Object retrieve(String checkpointKey) throws IOException + { + if (!getStore().isConnected()) { + getStore().connect(); + } + + return getStore().get(checkpointKey); + } + + /** + * Removes stored operator object for given operatorId & windowId from store + * + */ + @Override + public void delete(int operatorId, long windowId) throws IOException + { + + if (!getStore().isConnected()) { + getStore().connect(); + } + + try { + getStore().remove(generateKey(operatorId, windowId)); + logger.debug("deleted object from store key {} region {}", generateKey(operatorId, windowId)); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + + } + + /** + * Returns list window id for given operator id for which operator objects are + * stored but not removed + * + */ + @Override + public long[] getWindowIds(int operatorId) throws IOException + { + if (!getStore().isConnected()) { + getStore().connect(); + } + + List<String> keys = getStore().getKeys(operatorId); + if (keys.size() > 0) { + long[] windowsIds = new long[keys.size()]; + int count = 0; + for (String key : keys) { + windowsIds[count] = extractwindowId(key); + count++; + } + return windowsIds; + } else { + return new long[0]; + } + } + + public static long extractwindowId(String checkpointKey) + { + String[] parts = checkpointKey.split(CHECKPOINT_KEY_SEPARATOR); + return Long.parseLong(parts[1]); + } + + /** + * Saves the yarn application id which can be used by create application + * specific table/region in KeyValue sore. + */ + @Override + public void setApplicationAttributes(AttributeMap map) + { + this.applicationId = map.get(DAGContext.APPLICATION_ID); + getStore().setTableName(applicationId); + } + + private static final long serialVersionUID = 7065320156997171116L; + private static final Logger logger = LoggerFactory.getLogger(AbstractKeyValueStorageAgent.class); + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/377190a7/library/src/main/java/com/datatorrent/lib/util/StorageAgent.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/util/StorageAgent.java b/library/src/main/java/com/datatorrent/lib/util/StorageAgent.java new file mode 100644 index 0000000..7e2ce67 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/util/StorageAgent.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.util; + +import java.io.IOException; + +import com.datatorrent.api.Attribute.AttributeMap; + +/** + * Interface to define writing/reading checkpoint state of any operator. + * + * @since 0.9.4 + */ +public interface StorageAgent +{ + /** + * Save the object so that the same object can be loaded later using the given combination of + * operatorId and windowId. + * + * Typically the storage agent would serialize the state of the object during the save state. + * The serialized state can be accessed from anywhere on the cluster to recreate the object + * through the load callback. + * + * @param object - The operator whose state needs to be saved. + * @param operatorId - Identifier of the operator. + * @param windowId - Identifier for the specific state of the operator. + * @throws IOException + */ + public void save(Object object, int operatorId, long windowId) throws IOException; + + /** + * Get the input stream from which can be used to retrieve the stored objects back. + * + * @param operatorId Id for which the object was previously saved + * @param windowId WindowId for which the object was previously saved + * @return object (or a copy of it) which was saved earlier using the save call. + * @throws IOException + */ + public Object load(int operatorId, long windowId) throws IOException; + + /** + * Delete the artifacts related to store call of the operatorId and the windowId. + * + * Through this call, the agent is informed that the object saved against the operatorId + * and the windowId together will not be needed again. + * + * @param operatorId + * @param windowId + * @throws IOException + */ + public void delete(int operatorId, long windowId) throws IOException; + + /** + * Return an array windowId for which the object was saved but not deleted. + * + * The set is essentially difference between two sets. The first set contains + * all the windowIds passed using the successful save calls. The second set contains all + * the windowIds passed using the successful delete calls. + * + * @param operatorId - The operator for which the state was saved. + * @return Collection of windowIds for available states that can be retrieved through load. + * @throws IOException + */ + public long[] getWindowIds(int operatorId) throws IOException; + + /** + * Interface to pass application attributes to storage agent + * + * + */ + public interface ApplicationAwareStorageAgent extends StorageAgent + { + + /** + * Passes attributes of application to storage agent + * + * @param map attributes of application + */ + public void setApplicationAttributes(AttributeMap map); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/377190a7/library/src/main/java/com/datatorrent/lib/util/StorageAgentKeyValueStore.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/util/StorageAgentKeyValueStore.java b/library/src/main/java/com/datatorrent/lib/util/StorageAgentKeyValueStore.java new file mode 100644 index 0000000..276019d --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/util/StorageAgentKeyValueStore.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.util; + +import java.util.List; + +import com.datatorrent.lib.db.KeyValueStore; + +/** + * Interface for KeyValue store + * + */ +public interface StorageAgentKeyValueStore extends KeyValueStore +{ + + /** + * Get all the keys associated with key + * + * @param key + * @return the list of all associated keys + */ + public List<String> getKeys(Object key); + + /** + * Set table/region name of store + * + * @param tableName + */ + public void setTableName(String tableName); + + +}
