Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 9c557fca1 -> 377190a7d


 MLHR-1938 #comment Concrete implementation of In memory storage agent for 
Apache Geode


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/377190a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/377190a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/377190a7

Branch: refs/heads/devel-3
Commit: 377190a7d948930fb483b6f034c0784745e0d364
Parents: 9c557fc
Author: Ashish <[email protected]>
Authored: Sun Feb 21 20:10:54 2016 +0530
Committer: Ashish <[email protected]>
Committed: Tue Feb 23 00:51:31 2016 +0530

----------------------------------------------------------------------
 .../contrib/geode/GeodeCheckpointStore.java     | 327 +++++++++++++++++++
 .../geode/GeodeKeyValueStorageAgent.java        |  64 ++++
 .../contrib/geode/RegionCreateFunction.java     |  80 +++++
 .../contrib/geode/GeodeCheckpointStoreTest.java | 153 +++++++++
 .../geode/GeodeKeyValueStorageAgentTest.java    | 214 ++++++++++++
 .../lib/util/AbstractKeyValueStorageAgent.java  | 233 +++++++++++++
 .../com/datatorrent/lib/util/StorageAgent.java  |  98 ++++++
 .../lib/util/StorageAgentKeyValueStore.java     |  48 +++
 8 files changed, 1217 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/377190a7/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeCheckpointStore.java 
b/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeCheckpointStore.java
new file mode 100644
index 0000000..61113a9
--- /dev/null
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeCheckpointStore.java
@@ -0,0 +1,327 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.geode;
+
+import com.datatorrent.lib.util.StorageAgentKeyValueStore;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.client.ClientCache;
+import com.gemstone.gemfire.cache.client.ClientCacheFactory;
+import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
+import com.gemstone.gemfire.cache.execute.Execution;
+import com.gemstone.gemfire.cache.execute.FunctionService;
+import com.gemstone.gemfire.cache.query.Query;
+import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.cache.query.SelectResults;
+import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Geode Store implementation of {@link StorageAgentKeyValueStore} Uses {@link 
Kryo}
+ * serialization to store retrieve objects
+ * 
+ *
+ */
+public class GeodeCheckpointStore
+    implements StorageAgentKeyValueStore, Serializable
+{
+
+  public static final String GET_KEYS_QUERY = 
+      "SELECT entry.key FROM /$[region}.entries entry WHERE entry.key LIKE 
'${operator.id}%'";
+
+  private String geodeLocators;
+  private String geodeRegionName;
+
+  public String getGeodeRegionName()
+  {
+    return geodeRegionName;
+  }
+
+  public void setGeodeRegionName(String geodeRegionName)
+  {
+    this.geodeRegionName = geodeRegionName;
+  }
+
+  protected transient Kryo kryo;
+
+  public GeodeCheckpointStore()
+  {
+    geodeLocators = null;
+    kryo = null;
+  }
+
+  /**
+   * Initializes Geode store by using locator connection string
+   * 
+   * @param locatorString
+   */
+  public GeodeCheckpointStore(String locatorString)
+  {
+    this.geodeLocators = locatorString;
+    kryo = new Kryo();
+  }
+
+  private Kryo getKyro()
+  {
+    if (kryo == null) {
+      kryo = new Kryo();
+    }
+    return kryo;
+  }
+
+  /**
+   * Get the Geode locator connection string
+   * 
+   * @return locator connection string
+   */
+  public String getGeodeLocators()
+  {
+    return geodeLocators;
+  }
+
+  /**
+   * Sets the Geode locator string
+   * 
+   * @param geodeLocators
+   */
+  public void setGeodeLocators(String geodeLocators)
+  {
+    this.geodeLocators = geodeLocators;
+  }
+
+  private transient ClientCache clientCache = null;
+  private transient Region<String, byte[]> region = null;
+
+  /**
+   * Connect the Geode store by initializing Geode Client Cache
+   */
+  @Override
+  public void connect() throws IOException
+  {
+    ClientCacheFactory factory = new ClientCacheFactory();
+    Map<String, String> locators = parseLocatorString(geodeLocators);
+
+    if (locators.size() == 0) {
+      throw new IllegalArgumentException("Invalid locator connection string " 
+ geodeLocators);
+    } else {
+      for (Entry<String, String> entry : locators.entrySet()) {
+        factory.addPoolLocator(entry.getKey(), 
Integer.valueOf(entry.getValue()));
+      }
+    }
+    clientCache = factory.create();
+  }
+
+  private Region<String, byte[]> getGeodeRegion() throws IOException
+  {
+    if (clientCache == null) {
+      this.connect();
+    }
+    if (region == null) {
+      region = clientCache.getRegion(geodeRegionName);
+      if (region == null) {
+        createRegion();
+        region = clientCache.<String, 
byte[]>createClientRegionFactory(ClientRegionShortcut.PROXY)
+            .create(geodeRegionName);
+      }
+    }
+
+    return region;
+  }
+
+  /**
+   * Creates a region
+   * 
+   */
+  public synchronized void createRegion()
+  {
+    RegionCreateFunction atcf = new RegionCreateFunction();
+    java.util.List<Object> inputList = new java.util.ArrayList<Object>();
+
+    inputList.add(geodeRegionName);
+    inputList.add(true);
+
+    Execution members = 
FunctionService.onServers(clientCache.getDefaultPool()).withArgs(inputList);
+    members.execute(atcf.getId()).getResult();
+  }
+
+  /**
+   * Disconnect the connection to Geode store by closing Client Cache 
connection
+   */
+  @Override
+  public void disconnect() throws IOException
+  {
+    clientCache.close();
+  }
+
+  /**
+   * Check if store is connected to configured Geode cluster or not
+   * 
+   * @return True is connected to Geode cluster and client cache is active
+   */
+  @Override
+  public boolean isConnected()
+  {
+    if (clientCache == null) {
+      return false;
+    }
+    return !clientCache.isClosed();
+  }
+
+  /**
+   * Return the value for specified key from Geode region
+   * 
+   * @return the value object
+   */
+  @Override
+  public Object get(Object key)
+  {
+
+    try {
+      byte[] obj = getGeodeRegion().get((String)key);
+      if (obj == null) {
+        return null;
+      }
+
+      ByteArrayInputStream byteArrayInputStream = new 
ByteArrayInputStream(obj);
+      getKyro().setClassLoader(Thread.currentThread().getContextClassLoader());
+      Input input = new Input(byteArrayInputStream);
+      return getKyro().readClassAndObject(input);
+
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Put given given key & value in Geode region
+   */
+  @Override
+  public void put(Object key, Object value)
+  {
+    try {
+      Output output = new Output(4096, Integer.MAX_VALUE);
+      getKyro().writeClassAndObject(output, value);
+      getGeodeRegion().put((String)key, output.getBuffer());
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Removed the record associated for specified key from Geode region
+   */
+  @Override
+  public void remove(Object key)
+  {
+    try {
+      getGeodeRegion().destroy((String)key);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Get list for keys starting from provided key name
+   * 
+   * @return List of keys
+   */
+  @Override
+  public List<String> getKeys(Object key)
+  {
+    List<String> keys = null;
+    try {
+      keys = queryIds((int)(key));
+      return keys;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public List<String> queryIds(int operatorId) throws IOException
+  {
+    List<String> ids = new ArrayList<>();
+    try {
+      QueryService queryService = clientCache.getQueryService();
+      Query query = queryService.newQuery(
+          GET_KEYS_QUERY.replace("$[region}", 
geodeRegionName).replace("${operator.id}", String.valueOf(operatorId)));
+      logger.debug("executing query {} ", query.getQueryString());
+
+      SelectResults results = (SelectResults)query.execute();
+      for (Iterator iterator = results.iterator(); iterator.hasNext();) {
+        ids.add(String.valueOf(iterator.next()));
+      }
+
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    return ids;
+  }
+
+  /**
+   * Sets the region name for this Store instance to connect to
+   */
+  @Override
+  public void setTableName(String tableName)
+  {
+    this.geodeRegionName = tableName;
+  }
+
+  private Map<String, String> parseLocatorString(String locatorConnString)
+  {
+    Map<String, String> locators = Maps.newHashMap();
+    for (String locator : locatorConnString.split(",")) {
+      String[] parts = locator.split(":");
+      if (parts.length > 1 && !parts[0].isEmpty() && parts[0] != "" && 
!parts[1].isEmpty() && parts[1] != "") {
+        locators.put(parts[0], parts[1]);
+      } else {
+        throw new IllegalArgumentException("Wrong locator connection string : 
" + locatorConnString + "\n"
+            + "Expected format locator1:locator1_port,locator2:locator2_port");
+      }
+    }
+    return locators;
+  }
+
+  private static final long serialVersionUID = 8897644407674960149L;
+  private static final Logger logger = 
LoggerFactory.getLogger(GeodeCheckpointStore.class);
+
+
+  @Override
+  public List<Object> getAll(List<Object> keys)
+  {
+    return null;
+  }
+
+  @Override
+  public void putAll(Map<Object, Object> m)
+  {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/377190a7/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeKeyValueStorageAgent.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeKeyValueStorageAgent.java
 
b/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeKeyValueStorageAgent.java
new file mode 100644
index 0000000..750f7d8
--- /dev/null
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeKeyValueStorageAgent.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.geode;
+
+import java.io.Serializable;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.lib.util.AbstractKeyValueStorageAgent;
+
+/**
+ * Storage Agent implementation which uses {@link GeodeCheckpointStore} for 
operator
+ * checkpointing
+ * 
+ *
+ */
+public class GeodeKeyValueStorageAgent extends 
AbstractKeyValueStorageAgent<GeodeCheckpointStore> implements Serializable
+{
+
+  /**
+   * Geode locator connection string which needs to be provided by application
+   * developer/admin Format - locator1:locator1_port,locator2:locator2_port
+   */
+  public static final String GEODE_LOCATOR_STRING = 
"dt.checkpoint.agent.geode.connection";
+
+  public GeodeKeyValueStorageAgent()
+  {
+    setStore(new GeodeCheckpointStore());
+  }
+
+  public GeodeKeyValueStorageAgent(Configuration conf)
+  {
+    setStore(new GeodeCheckpointStore(conf.get(GEODE_LOCATOR_STRING)));
+  }
+
+  /**
+   * Saves yarn application id which can be used by Key value store to create
+   * application specific Geode region
+   */
+  @Override
+  public void setApplicationId(String applicationId)
+  {
+    store.setTableName(applicationId);
+    super.setApplicationId(applicationId);
+  }
+
+  private static final long serialVersionUID = -8838327680202565778L;
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/377190a7/contrib/src/main/java/com/datatorrent/contrib/geode/RegionCreateFunction.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/geode/RegionCreateFunction.java 
b/contrib/src/main/java/com/datatorrent/contrib/geode/RegionCreateFunction.java
new file mode 100644
index 0000000..5637953
--- /dev/null
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/geode/RegionCreateFunction.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.geode;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.Declarable;
+import com.gemstone.gemfire.cache.RegionExistsException;
+import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.cache.execute.FunctionAdapter;
+import com.gemstone.gemfire.cache.execute.FunctionContext;
+
+/**
+ * Function to create region dynamically through client API
+ * 
+ */
+public class RegionCreateFunction extends FunctionAdapter implements Declarable
+{
+
+  @Override
+  public void init(Properties arg0)
+  {
+  }
+
+  /**
+   * Create a region in Geode cluster
+   */
+  @Override
+  public void execute(FunctionContext context)
+  {
+    List<Object> args = (List<Object>)context.getArguments();
+    String regionName = (String)args.get(0);
+
+    try {
+      Cache cache = CacheFactory.getAnyInstance();
+      if (cache.getRegion(regionName) == null) {
+
+        cache.createRegionFactory(RegionShortcut.PARTITION).create(regionName);
+      }
+    } catch (RegionExistsException re) {
+      context.getResultSender().lastResult(new ArrayList<Integer>());
+    } catch (CacheClosedException e) {
+      context.getResultSender().lastResult(new ArrayList<Integer>());
+    }
+    context.getResultSender().lastResult(new ArrayList<Integer>());
+  }
+
+  /**
+   * Return name to create Region
+   */
+  @Override
+  public String getId()
+  {
+    return this.getClass().getName();
+  }
+
+  private static final long serialVersionUID = 2450085868879041729L;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/377190a7/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeCheckpointStoreTest.java
 
b/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeCheckpointStoreTest.java
new file mode 100644
index 0000000..5c59622
--- /dev/null
+++ 
b/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeCheckpointStoreTest.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.geode;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.gemstone.gemfire.cache.query.FunctionDomainException;
+import com.gemstone.gemfire.cache.query.NameResolutionException;
+import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
+import com.gemstone.gemfire.cache.query.TypeMismatchException;
+import com.google.common.collect.Maps;
+
+public class GeodeCheckpointStoreTest
+{
+
+  static String LOCATOR_HOST = "localhost:10334";
+  static final String REGION_NAME = "apex-checkpoint-region";
+
+  static final String KEY1 = "key1";
+  static final String KEY2 = "key2";
+  static final String KEY3 = "key3";
+
+  static GeodeCheckpointStore store;
+
+  @Before
+  public void setUp() throws Exception
+  {
+    if (System.getProperty("dev.locator.connection") != null) {
+      LOCATOR_HOST = System.getProperty("dev.locator.connection");
+    }
+
+    store = new GeodeCheckpointStore(LOCATOR_HOST);
+    store.setTableName(REGION_NAME);
+    store.connect();
+  }
+  
+  @Test
+  public void testSave() throws IOException
+  {
+    Map<Integer, String> data = Maps.newHashMap();
+    data.put(1, "one");
+    data.put(2, "two");
+    data.put(3, "three");
+    store.put(KEY1, data);
+    @SuppressWarnings("unchecked")
+    Map<Integer, String> decoded = (Map<Integer, String>)store.get(KEY1);
+    Assert.assertEquals("dataOf1", data, decoded);
+  }
+
+  @Test
+  public void testLoad() throws IOException
+  {
+    Map<Integer, String> dataOf1 = Maps.newHashMap();
+    dataOf1.put(1, "one");
+    dataOf1.put(2, "two");
+    dataOf1.put(3, "three");
+
+    Map<Integer, String> dataOf2 = Maps.newHashMap();
+    dataOf2.put(4, "four");
+    dataOf2.put(5, "five");
+    dataOf2.put(6, "six");
+
+    store.put(KEY1, dataOf1);
+    store.put(KEY2, dataOf2);
+    @SuppressWarnings("unchecked")
+    Map<Integer, String> decoded1 = (Map<Integer, String>)store.get(KEY1);
+
+    @SuppressWarnings("unchecked")
+    Map<Integer, String> decoded2 = (Map<Integer, String>)store.get(KEY2);
+    Assert.assertEquals("data of 1", dataOf1, decoded1);
+    Assert.assertEquals("data of 2", dataOf2, decoded2);
+  }
+
+  @Test
+  public void testDelete() throws IOException, FunctionDomainException, 
TypeMismatchException, NameResolutionException,
+      QueryInvocationTargetException
+  {
+    testLoad();
+
+    store.remove(KEY1);
+    Assert.assertTrue("operator 2 window 1", (store.get(KEY2) != null));
+    Assert.assertFalse("operator 1 window 1", (store.get(KEY1) != null));
+  }
+
+  //@Test
+  public void testGetWindowIds() throws IOException
+  {
+    store.disconnect();
+    final String REGION_NAME = new 
SimpleDateFormat("yyyy-MM-dd-HH-mm-ss").format(new Date());
+    store.setTableName(REGION_NAME);
+    store.connect();
+    Map<Integer, String> obj = Maps.newHashMap();
+    obj.put(1, "one");
+    obj.put(2, "two");
+    obj.put(3, "three");
+
+    List<String> op1WindowIds = new ArrayList<>();
+    op1WindowIds.add("111");
+    op1WindowIds.add("112");
+    op1WindowIds.add("113");
+    for (String l : op1WindowIds) {
+      store.put(l, obj);
+    }
+
+    List<String> op2WindowIds = new ArrayList<>();
+    op2WindowIds.add("211");
+    op2WindowIds.add("212");
+    for (String l : op2WindowIds) {
+      store.put(l, obj);
+    }
+
+    List<String> op1WinIds = store.getKeys(1);
+    List<String> op2WinIds = store.getKeys(2);
+
+    Assert.assertEquals(op1WindowIds.size(), op1WinIds.size());
+    Assert.assertEquals(op2WindowIds.size(), op2WinIds.size());
+    Assert.assertTrue(op1WindowIds.containsAll(op1WinIds));
+    Assert.assertTrue(op2WindowIds.containsAll(op2WinIds));
+  }
+
+  @After
+  public void tearDown() throws Exception
+  {
+    store.disconnect();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/377190a7/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeKeyValueStorageAgentTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeKeyValueStorageAgentTest.java
 
b/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeKeyValueStorageAgentTest.java
new file mode 100644
index 0000000..74e5c1c
--- /dev/null
+++ 
b/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeKeyValueStorageAgentTest.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.geode;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.gemstone.gemfire.cache.query.FunctionDomainException;
+import com.gemstone.gemfire.cache.query.NameResolutionException;
+import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
+import com.gemstone.gemfire.cache.query.TypeMismatchException;
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.DAG;
+
+/**
+ * Test setup instructions
+ *
+ * Setup a local Geode cluster by starting locator & server using Geode's Gfsh 
shell
+ *
+ * start Gfsh - gemfire-assembly/build/install/apache-geode/bin/gfsh
+ *
+ * gfsh>start locator - gfsh>start locator --name=L1
+ *
+ * gfsh>start server - gfsh>start server --name=S1
+ *
+ * Checkpointing storage agent needs to dynamically create Geode regions for 
per application.
+ *
+ * To be able Programmatically create Geode region deploy below Server 
function through Gfsh
+ *
+ * > jar -cvf geode-fun.jar 
com/datatorrent/contrib/geode/RegionCreateFunction.class
+ *
+ * gfsh> deploy --jar=/tmp/jars/geode-fun.jar
+ *
+ * gfsh> list functions // verify RegionCreateFunction is listed
+ *
+ * gfsh> describe member --name=L1
+ *
+ * provide locators details from above command in LOCATOR_HOST as 
<>locator-host:<locator-io>
+ */
+public class  GeodeKeyValueStorageAgentTest
+
+{
+  private static class TestMeta extends TestWatcher
+  {
+    String applicationPath;
+    GeodeKeyValueStorageAgent storageAgent;
+    static String LOCATOR_HOST = "localhost:10334";
+    static final String REGION_NAME = "GeodeKeyValueStorageAgentTest";
+
+    @Override
+    protected void starting(Description description)
+    {
+      super.starting(description);
+
+      applicationPath = "target/" + description.getClassName() + "/" + 
description.getMethodName();
+      if (System.getProperty("dev.locator.connection") != null) {
+        LOCATOR_HOST = System.getProperty("dev.locator.connection");
+      }
+      try {
+        FileUtils.forceMkdir(new File("target/" + description.getClassName()));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+
+      Configuration config = new Configuration();
+      config.set(GeodeKeyValueStorageAgent.GEODE_LOCATOR_STRING, LOCATOR_HOST);
+
+      storageAgent = new GeodeKeyValueStorageAgent(config);
+      storageAgent.setApplicationId(REGION_NAME);
+      Attribute.AttributeMap.DefaultAttributeMap attributes = new 
Attribute.AttributeMap.DefaultAttributeMap();
+      attributes.put(DAG.APPLICATION_PATH, applicationPath);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      try {
+        storageAgent.getStore().disconnect();
+        FileUtils.deleteDirectory(new File("target/" + 
description.getClassName()));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testSave() throws IOException
+  {
+    Map<Integer, String> data = Maps.newHashMap();
+    data.put(1, "one");
+    data.put(2, "two");
+    data.put(3, "three");
+    testMeta.storageAgent.save(data, 1, 1);
+    @SuppressWarnings("unchecked")
+    Map<Integer, String> decoded = (Map<Integer, 
String>)testMeta.storageAgent.load(1, 1);
+    Assert.assertEquals("dataOf1", data, decoded);
+  }
+
+  @Test
+  public void testLoad() throws IOException
+  {
+    Map<Integer, String> dataOf1 = Maps.newHashMap();
+    dataOf1.put(1, "one");
+    dataOf1.put(2, "two");
+    dataOf1.put(3, "three");
+
+    Map<Integer, String> dataOf2 = Maps.newHashMap();
+    dataOf2.put(4, "four");
+    dataOf2.put(5, "five");
+    dataOf2.put(6, "six");
+
+    testMeta.storageAgent.save(dataOf1, 1, 1);
+    testMeta.storageAgent.save(dataOf2, 2, 1);
+    @SuppressWarnings("unchecked")
+    Map<Integer, String> decoded1 = (Map<Integer, 
String>)testMeta.storageAgent.load(1, 1);
+
+    @SuppressWarnings("unchecked")
+    Map<Integer, String> decoded2 = (Map<Integer, 
String>)testMeta.storageAgent.load(2, 1);
+    Assert.assertEquals("data of 1", dataOf1, decoded1);
+    Assert.assertEquals("data of 2", dataOf2, decoded2);
+  }
+
+  @Test
+  public void testRecovery() throws IOException
+  {
+    testSave();
+    Configuration config = new Configuration();
+    config.set(GeodeKeyValueStorageAgent.GEODE_LOCATOR_STRING, 
testMeta.LOCATOR_HOST);
+    testMeta.storageAgent = new GeodeKeyValueStorageAgent(config);
+    testMeta.storageAgent.setApplicationId(testMeta.REGION_NAME);
+    testSave();
+  }
+
+  @Test
+  public void testDelete() throws IOException, FunctionDomainException, 
TypeMismatchException, NameResolutionException,
+      QueryInvocationTargetException
+  {
+    testLoad();
+
+    testMeta.storageAgent.delete(1, 1);
+    Assert.assertTrue("operator 2 window 1", (testMeta.storageAgent.load(2, 1) 
!= null));
+    Assert.assertFalse("operator 1 window 1", (testMeta.storageAgent.load(1, 
1) != null));
+  }
+
+  //@Test
+  public void testGetWindowIds() throws IOException
+  {
+    final String REGION_NAME = new 
SimpleDateFormat("yyyy-MM-dd-HH-mm-ss").format(new Date());
+    testMeta.storageAgent.setApplicationId(REGION_NAME);
+    Map<Integer, String> obj = Maps.newHashMap();
+    obj.put(1, "one");
+    obj.put(2, "two");
+    obj.put(3, "three");
+
+    long[] op1WindowIds = {111, 112, 113};
+    for (long l : op1WindowIds) {
+      testMeta.storageAgent.save(obj, 1, l);
+    }
+
+    long[] op2WindowIds = {211, 212};
+    for (long l : op2WindowIds) {
+      testMeta.storageAgent.save(obj, 2, l);
+    }
+
+    Arrays.sort(op1WindowIds);
+    Arrays.sort(op2WindowIds);
+    long[] op1WinIds = testMeta.storageAgent.getWindowIds(1);
+    long[] op2WinIds = testMeta.storageAgent.getWindowIds(2);
+    Arrays.sort(op1WinIds);
+    Arrays.sort(op2WinIds);
+
+    Assert.assertEquals(op1WindowIds.length, op1WinIds.length);
+    Assert.assertEquals(op2WindowIds.length, op2WinIds.length);
+
+    Assert.assertTrue(Arrays.equals(op1WindowIds, op1WinIds));
+    Assert.assertTrue(Arrays.equals(op2WindowIds, op2WinIds));
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/377190a7/library/src/main/java/com/datatorrent/lib/util/AbstractKeyValueStorageAgent.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/util/AbstractKeyValueStorageAgent.java
 
b/library/src/main/java/com/datatorrent/lib/util/AbstractKeyValueStorageAgent.java
new file mode 100644
index 0000000..520877b
--- /dev/null
+++ 
b/library/src/main/java/com/datatorrent/lib/util/AbstractKeyValueStorageAgent.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.util;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Attribute.AttributeMap;
+import com.datatorrent.api.Context.DAGContext;
+import com.datatorrent.lib.util.StorageAgent.ApplicationAwareStorageAgent;
+
+/**
+ * Abstract implementation of {@link ApplicationAwareStorageAgent} which can be
+ * configured be KeyValue store witch implementation of {@link 
StorageAgentKeyValueStore}
+ * 
+ * NOTE - this should be picked from APEX-CORE once below feature is release
+ * https://issues.apache.org/jira/browse/APEXCORE-283
+ * 
+ * @param <S>
+ *          Store implementation
+ */
+public abstract class AbstractKeyValueStorageAgent<S extends 
StorageAgentKeyValueStore>
+    implements ApplicationAwareStorageAgent, Serializable
+{
+
+  protected S store;
+  protected String applicationId;
+  public static final String CHECKPOINT_KEY_SEPARATOR = "-";
+
+  /**
+   * Gets the store
+   *
+   * @return the store
+   */
+  public S getStore()
+  {
+    return store;
+  }
+
+  /**
+   * Sets the store
+   *
+   * @param store
+   */
+  public void setStore(S store)
+  {
+    this.store = store;
+  }
+
+  /**
+   * Return yarn application id of running application
+   * 
+   * @return
+   */
+  public String getApplicationId()
+  {
+    return applicationId;
+  }
+
+  /**
+   * Set yarn application id
+   * 
+   * @param applicationId
+   */
+  public void setApplicationId(String applicationId)
+  {
+    this.applicationId = applicationId;
+  }
+
+  /**
+   * Generates key from operator id and window id to store unique operator
+   * checkpoints
+   * 
+   * @param operatorId
+   * @param windowId
+   * @return unique key for store
+   */
+  public static String generateKey(int operatorId, long windowId)
+  {
+    return String.valueOf(operatorId) + CHECKPOINT_KEY_SEPARATOR + 
String.valueOf(windowId);
+  }
+
+  /**
+   * Stores the given operator object in configured store
+   * 
+   * @param object
+   *          Operator object to store
+   * @param operatorId
+   *          of operator
+   * @param windowId
+   *          window id of operator to checkpoint
+   * 
+   */
+  @Override
+  public void save(Object object, int operatorId, long windowId) throws 
IOException
+  {
+
+    try {
+      store(generateKey(operatorId, windowId), object);
+      logger.debug("saved check point object key {} region {}", 
generateKey(operatorId, windowId), applicationId);
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  private synchronized void store(String checkpointKey, Object operator) 
throws IOException
+  {
+    if (!getStore().isConnected()) {
+      getStore().connect();
+    }
+    getStore().put(checkpointKey, operator);
+  }
+
+  /**
+   * Retrieves the operator object for given operator & window from configured
+   * store
+   * 
+   * @param operatorId
+   *          of operator
+   * @param windowId
+   *          window id of operator to checkpoint
+   */
+  @Override
+  public Object load(int operatorId, long windowId)
+  {
+    Object obj = null;
+    try {
+      obj = retrieve(generateKey(operatorId, windowId));
+      logger.debug("retrieved object from store  key {} region {} ", 
generateKey(operatorId, windowId), applicationId);
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+
+    return obj;
+  }
+
+  private synchronized Object retrieve(String checkpointKey) throws IOException
+  {
+    if (!getStore().isConnected()) {
+      getStore().connect();
+    }
+
+    return getStore().get(checkpointKey);
+  }
+
+  /**
+   * Removes stored operator object for given operatorId & windowId from store
+   * 
+   */
+  @Override
+  public void delete(int operatorId, long windowId) throws IOException
+  {
+
+    if (!getStore().isConnected()) {
+      getStore().connect();
+    }
+
+    try {
+      getStore().remove(generateKey(operatorId, windowId));
+      logger.debug("deleted object from store key {} region {}", 
generateKey(operatorId, windowId));
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+
+  }
+
+  /**
+   * Returns list window id for given operator id for which operator objects 
are
+   * stored but not removed
+   * 
+   */
+  @Override
+  public long[] getWindowIds(int operatorId) throws IOException
+  {
+    if (!getStore().isConnected()) {
+      getStore().connect();
+    }
+
+    List<String> keys = getStore().getKeys(operatorId);
+    if (keys.size() > 0) {
+      long[] windowsIds = new long[keys.size()];
+      int count = 0;
+      for (String key : keys) {
+        windowsIds[count] = extractwindowId(key);
+        count++;
+      }
+      return windowsIds;
+    } else {
+      return new long[0];
+    }
+  }
+
+  public static long extractwindowId(String checkpointKey)
+  {
+    String[] parts = checkpointKey.split(CHECKPOINT_KEY_SEPARATOR);
+    return Long.parseLong(parts[1]);
+  }
+
+  /**
+   * Saves the yarn application id which can be used by create application
+   * specific table/region in KeyValue sore.
+   */
+  @Override
+  public void setApplicationAttributes(AttributeMap map)
+  {
+    this.applicationId = map.get(DAGContext.APPLICATION_ID);
+    getStore().setTableName(applicationId);
+  }
+
+  private static final long serialVersionUID = 7065320156997171116L;
+  private static final Logger logger = 
LoggerFactory.getLogger(AbstractKeyValueStorageAgent.class);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/377190a7/library/src/main/java/com/datatorrent/lib/util/StorageAgent.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/util/StorageAgent.java 
b/library/src/main/java/com/datatorrent/lib/util/StorageAgent.java
new file mode 100644
index 0000000..7e2ce67
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/util/StorageAgent.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.util;
+
+import java.io.IOException;
+
+import com.datatorrent.api.Attribute.AttributeMap;
+
+/**
+ * Interface to define writing/reading checkpoint state of any operator.
+ *
+ * @since 0.9.4
+ */
+public interface StorageAgent
+{
+  /**
+   * Save the object so that the same object can be loaded later using the 
given combination of
+   * operatorId and windowId.
+   *
+   * Typically the storage agent would serialize the state of the object 
during the save state.
+   * The serialized state can be accessed from anywhere on the cluster to 
recreate the object
+   * through the load callback.
+   *
+   * @param object - The operator whose state needs to be saved.
+   * @param operatorId - Identifier of the operator.
+   * @param windowId - Identifier for the specific state of the operator.
+   * @throws IOException
+   */
+  public void save(Object object, int operatorId, long windowId) throws 
IOException;
+
+  /**
+   * Get the input stream from which can be used to retrieve the stored 
objects back.
+   *
+   * @param operatorId Id for which the object was previously saved
+   * @param windowId WindowId for which the object was previously saved
+   * @return object (or a copy of it) which was saved earlier using the save 
call.
+   * @throws IOException
+   */
+  public Object load(int operatorId, long windowId) throws IOException;
+
+  /**
+   * Delete the artifacts related to store call of the operatorId and the 
windowId.
+   *
+   * Through this call, the agent is informed that the object saved against 
the operatorId
+   * and the windowId together will not be needed again.
+   *
+   * @param operatorId
+   * @param windowId
+   * @throws IOException
+   */
+  public void delete(int operatorId, long windowId) throws IOException;
+
+  /**
+   * Return an array windowId for which the object was saved but not deleted.
+   *
+   * The set is essentially difference between two sets. The first set contains
+   * all the windowIds passed using the successful save calls. The second set 
contains all
+   * the windowIds passed using the successful delete calls.
+   *
+   * @param operatorId - The operator for which the state was saved.
+   * @return Collection of windowIds for available states that can be 
retrieved through load.
+   * @throws IOException
+   */
+  public long[] getWindowIds(int operatorId) throws IOException;
+
+  /**
+   * Interface to pass application attributes to storage agent
+   * 
+   *
+   */
+  public interface ApplicationAwareStorageAgent extends StorageAgent
+  {
+   
+    /**
+     * Passes attributes of application to storage agent
+     * 
+     * @param map attributes of application
+     */
+    public void setApplicationAttributes(AttributeMap map);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/377190a7/library/src/main/java/com/datatorrent/lib/util/StorageAgentKeyValueStore.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/util/StorageAgentKeyValueStore.java 
b/library/src/main/java/com/datatorrent/lib/util/StorageAgentKeyValueStore.java
new file mode 100644
index 0000000..276019d
--- /dev/null
+++ 
b/library/src/main/java/com/datatorrent/lib/util/StorageAgentKeyValueStore.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.util;
+
+import java.util.List;
+
+import com.datatorrent.lib.db.KeyValueStore;
+
+/**
+ * Interface for KeyValue store
+ * 
+ */
+public interface StorageAgentKeyValueStore extends KeyValueStore
+{
+
+   /**
+   * Get all the keys associated with key
+   * 
+   * @param key
+   * @return the list of all associated keys
+   */
+  public List<String> getKeys(Object key);
+
+  /**
+   * Set table/region name of store
+   * 
+   * @param tableName
+   */
+  public void setTableName(String tableName);
+  
+  
+}


Reply via email to