KYLIN-2627 add simple rollback on ResourceStore

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ea5cabac
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ea5cabac
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ea5cabac

Branch: refs/heads/KYLIN-2606
Commit: ea5cabac95b3d27aa60d6b16263904142e3d2daa
Parents: c9dc7cc
Author: Li Yang <liy...@apache.org>
Authored: Wed May 17 17:46:08 2017 +0800
Committer: Roger Shi <rogershijich...@gmail.com>
Committed: Wed May 17 19:40:34 2017 +0800

----------------------------------------------------------------------
 .../kylin/common/persistence/ResourceStore.java | 103 ++++++++++++++++++-
 .../persistence/LocalFileResourceStoreTest.java |  44 ++++++++
 2 files changed, 145 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/ea5cabac/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
 
b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index d5fbc2e..0565c66 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -18,7 +18,9 @@
 
 package org.apache.kylin.common.persistence;
 
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
@@ -27,6 +29,7 @@ import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.UUID;
@@ -237,6 +240,11 @@ abstract public class ResourceStore {
     final public void putResource(String resPath, InputStream content, long 
ts) throws IOException {
         resPath = norm(resPath);
         logger.trace("Directly saving resource " + resPath + " (Store " + 
kylinConfig.getMetadataUrl() + ")");
+        putResourceCheckpoint(resPath, content, ts);
+    }
+
+    private void putResourceCheckpoint(String resPath, InputStream content, 
long ts) throws IOException {
+        beforeChange(resPath);
         putResourceImpl(resPath, content, ts);
     }
 
@@ -266,7 +274,7 @@ abstract public class ResourceStore {
             dout.close();
             buf.close();
 
-            newTS = checkAndPutResourceImpl(resPath, buf.toByteArray(), oldTS, 
newTS);
+            newTS = checkAndPutResourceCheckpoint(resPath, buf.toByteArray(), 
oldTS, newTS);
             obj.setLastModified(newTS); // update again the confirmed TS
             return newTS;
         } catch (IOException e) {
@@ -278,6 +286,11 @@ abstract public class ResourceStore {
         }
     }
 
+    private long checkAndPutResourceCheckpoint(String resPath, byte[] content, 
long oldTS, long newTS) throws IOException {
+        beforeChange(resPath);
+        return checkAndPutResourceImpl(resPath, content, oldTS, newTS);
+    }
+
     /**
      * checks old timestamp when overwriting existing
      */
@@ -288,7 +301,12 @@ abstract public class ResourceStore {
      */
     final public void deleteResource(String resPath) throws IOException {
         logger.trace("Deleting resource " + resPath + " (Store " + 
kylinConfig.getMetadataUrl() + ")");
-        deleteResourceImpl(norm(resPath));
+        deleteResourceCheckpoint(norm(resPath));
+    }
+
+    private void deleteResourceCheckpoint(String resPath) throws IOException {
+        beforeChange(resPath);
+        deleteResourceImpl(resPath);
     }
 
     abstract protected void deleteResourceImpl(String resPath) throws 
IOException;
@@ -315,6 +333,87 @@ abstract public class ResourceStore {
 
     // 
============================================================================
 
+    ThreadLocal<Checkpoint> checkpointing = new ThreadLocal<>();
+
+    public Checkpoint checkpoint() {
+        Checkpoint cp = checkpointing.get();
+        if (cp != null)
+            throw new IllegalStateException("A checkpoint has been open for 
this thread: " + cp);
+
+        cp = new Checkpoint();
+        checkpointing.set(cp);
+        return cp;
+    }
+
+    private void beforeChange(String resPath) throws IOException {
+        Checkpoint cp = checkpointing.get();
+        if (cp != null)
+            cp.beforeChange(resPath);
+    }
+
+    public class Checkpoint implements Closeable {
+
+        LinkedHashMap<String, byte[]> origResData = new LinkedHashMap<>();
+        LinkedHashMap<String, Long> origResTimestamp = new LinkedHashMap<>();
+
+        private void beforeChange(String resPath) throws IOException {
+            if (origResData.containsKey(resPath))
+                return;
+
+            RawResource raw = getResourceImpl(resPath);
+            if (raw == null) {
+                origResData.put(resPath, null);
+                origResTimestamp.put(resPath, null);
+            } else {
+                origResData.put(resPath, readAll(raw.inputStream));
+                origResTimestamp.put(resPath, raw.timestamp);
+            }
+        }
+
+        private byte[] readAll(InputStream inputStream) throws IOException {
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            IOUtils.copy(inputStream, out);
+            inputStream.close();
+            out.close();
+            return out.toByteArray();
+        }
+
+        public void rollback() {
+            checkThread();
+
+            for (String resPath : origResData.keySet()) {
+                logger.debug("Rollbacking " + resPath);
+                try {
+                    byte[] data = origResData.get(resPath);
+                    Long ts = origResTimestamp.get(resPath);
+                    if (data == null || ts == null)
+                        deleteResourceImpl(resPath);
+                    else
+                        putResourceImpl(resPath, new 
ByteArrayInputStream(data), ts);
+                } catch (IOException ex) {
+                    logger.error("Failed to rollback " + resPath, ex);
+                }
+            }
+        }
+
+        @Override
+        public void close() throws IOException {
+            checkThread();
+
+            origResData = null;
+            origResTimestamp = null;
+            checkpointing.set(null);
+        }
+
+        private void checkThread() {
+            Checkpoint cp = checkpointing.get();
+            if (this != cp)
+                throw new IllegalStateException();
+        }
+    }
+
+    // 
============================================================================
+
     public static interface Visitor {
         void visit(String path) throws IOException;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ea5cabac/core-common/src/test/java/org/apache/kylin/common/persistence/LocalFileResourceStoreTest.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/test/java/org/apache/kylin/common/persistence/LocalFileResourceStoreTest.java
 
b/core-common/src/test/java/org/apache/kylin/common/persistence/LocalFileResourceStoreTest.java
index 17b608d..aca4a0a 100644
--- 
a/core-common/src/test/java/org/apache/kylin/common/persistence/LocalFileResourceStoreTest.java
+++ 
b/core-common/src/test/java/org/apache/kylin/common/persistence/LocalFileResourceStoreTest.java
@@ -18,7 +18,12 @@
 
 package org.apache.kylin.common.persistence;
 
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayInputStream;
+
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore.Checkpoint;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.junit.After;
 import org.junit.Before;
@@ -41,4 +46,43 @@ public class LocalFileResourceStoreTest extends 
LocalFileMetadataTestCase {
         ResourceStoreTest.testAStore("", KylinConfig.getInstanceFromEnv());
     }
 
+    @Test
+    public void testRollback() throws Exception {
+        ResourceStore store = 
ResourceStore.getStore(KylinConfig.getInstanceFromEnv());
+        byte[] bytes = new byte[] { 0, 1, 2 };
+        RawResource raw;
+        Checkpoint cp;
+
+        cp = store.checkpoint();
+        try {
+            store.putResource("/res1", new StringEntity("data1"), 1000, 
StringEntity.serializer);
+        } finally {
+            cp.close();
+        }
+        StringEntity str = store.getResource("/res1", StringEntity.class, 
StringEntity.serializer);
+        assertEquals("data1", str.toString());
+
+        cp = store.checkpoint();
+        try {
+            ByteArrayInputStream is = new ByteArrayInputStream(bytes);
+            store.putResource("/res2", is, 2000);
+            is.close();
+            
+            store.putResource("/res1", str, 2000, StringEntity.serializer);
+            store.deleteResource("/res1");
+
+            assertEquals(null, store.getResource("/res1"));
+            assertEquals(2000, (raw = store.getResource("/res2")).timestamp);
+            raw.inputStream.close();
+            
+            cp.rollback();
+            
+            assertEquals(null, store.getResource("/res2"));
+            assertEquals(1000, (raw = store.getResource("/res1")).timestamp);
+            raw.inputStream.close();
+        } finally {
+            cp.close();
+        }
+    }
+
 }

Reply via email to