Repository: asterixdb
Updated Branches:
  refs/heads/master 8076fe97d -> 5aeba9b47


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/util/LSMBTreeTestContext.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/util/LSMBTreeTestContext.java
 
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/util/LSMBTreeTestContext.java
index 315496d..e38514d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/util/LSMBTreeTestContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/util/LSMBTreeTestContext.java
@@ -41,6 +41,7 @@ import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import 
org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
 import org.apache.hyracks.util.trace.ITraceCategoryRegistry;
 import org.apache.hyracks.util.trace.ITracer;
 import org.apache.hyracks.util.trace.TraceCategoryRegistry;
@@ -99,13 +100,16 @@ public final class LSMBTreeTestContext extends 
OrderedIndexTestContext {
             lsmTree = LSMBTreeUtil.createLSMTree(ioManager, 
virtualBufferCaches, file, diskBufferCache, typeTraits,
                     cmpFactories, bloomFilterKeyFields, 
bloomFilterFalsePositiveRate, mergePolicy, opTracker,
                     ioScheduler, ioOpCallbackFactory, needKeyDupCheck, 
filterTypeTraits, filterCmp, btreefields,
-                    filterfields, true, metadataPageManagerFactory, 
updateAware, ITracer.NONE);
+                    filterfields, true, metadataPageManagerFactory, 
updateAware, ITracer.NONE,
+                    NoOpCompressorDecompressorFactory.INSTANCE);
         } else {
             lsmTree = LSMBTreeUtil.createLSMTree(ioManager, 
virtualBufferCaches, file, diskBufferCache, typeTraits,
                     cmpFactories, bloomFilterKeyFields, 
bloomFilterFalsePositiveRate, mergePolicy, opTracker,
                     ioScheduler, ioOpCallbackFactory, needKeyDupCheck, null, 
null, null, null, true,
-                    metadataPageManagerFactory, updateAware, new 
Tracer(LSMBTreeTestContext.class.getSimpleName(),
-                            ITraceCategoryRegistry.CATEGORIES_ALL, new 
TraceCategoryRegistry()));
+                    metadataPageManagerFactory,
+                    updateAware, new 
Tracer(LSMBTreeTestContext.class.getSimpleName(),
+                            ITraceCategoryRegistry.CATEGORIES_ALL, new 
TraceCategoryRegistry()),
+                    NoOpCompressorDecompressorFactory.INSTANCE);
         }
         LSMBTreeTestContext testCtx = new LSMBTreeTestContext(fieldSerdes, 
lsmTree, filtered);
         return testCtx;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheWithCompressionTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheWithCompressionTest.java
 
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheWithCompressionTest.java
new file mode 100644
index 0000000..a978bc5
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheWithCompressionTest.java
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common;
+
+import java.io.File;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.api.compression.ICompressorDecompressor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.HaltOnFailureCallback;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue;
+import 
org.apache.hyracks.storage.common.compression.SnappyCompressorDecompressorFactory;
+import 
org.apache.hyracks.storage.common.compression.file.CompressedFileReference;
+import 
org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.test.support.TestStorageManagerComponentHolder;
+import org.apache.hyracks.test.support.TestUtils;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class BufferCacheWithCompressionTest {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final List<String> openFiles = new ArrayList<>();
+    private static final SimpleDateFormat simpleDateFormat = new 
SimpleDateFormat("ddMMyy-hhmmssSS");
+
+    private static final ICompressorDecompressor compDecomp =
+            (new SnappyCompressorDecompressorFactory()).createInstance();
+    private static final int PAGE_SIZE = 256;
+    private static final int NUM_PAGES = 10;
+    private static final int MAX_OPEN_DATA_FILES = 20;
+    //Additional file (LAF) for each compressed file
+    private static final int ACTUAL_MAX_OPEN_FILE = MAX_OPEN_DATA_FILES * 2;
+    private static final int HYRACKS_FRAME_SIZE = PAGE_SIZE;
+    private IHyracksTaskContext ctx = TestUtils.create(HYRACKS_FRAME_SIZE);
+
+    private static final Random rnd = new Random(50);
+
+    private FileReference getFileReference(IIOManager ioManager) throws 
HyracksDataException {
+        String fileName = simpleDateFormat.format(new Date()) + 
openFiles.size();
+        final FileReference fileRef = ioManager.resolve(fileName);
+        final CompressedFileReference cFileRef = new 
CompressedFileReference(fileRef.getDeviceHandle(), compDecomp,
+                fileRef.getRelativePath(), fileRef.getRelativePath() + ".dic");
+
+        openFiles.add(fileName);
+        openFiles.add(cFileRef.getLAFRelativePath());
+        return cFileRef;
+    }
+
+    @Test
+    public void interruptPinTest() throws Exception {
+        final int bufferCacheNumPages = 4;
+        TestStorageManagerComponentHolder.init(PAGE_SIZE, bufferCacheNumPages, 
ACTUAL_MAX_OPEN_FILE);
+        IIOManager ioManager = 
TestStorageManagerComponentHolder.getIOManager();
+        IBufferCache bufferCache =
+                
TestStorageManagerComponentHolder.getBufferCache(ctx.getJobletContext().getServiceContext());
+        final long duration = TimeUnit.SECONDS.toMillis(20);
+        final FileReference file = getFileReference(ioManager);
+        final int fileId = bufferCache.createFile(file);
+        final int numPages = 16;
+        bufferCache.openFile(fileId);
+        final ICompressedPageWriter writer = 
bufferCache.getCompressedPageWriter(fileId);
+        final IFIFOPageQueue queue = bufferCache.createFIFOQueue();
+        for (int i = 0; i < numPages; i++) {
+            long dpid = BufferedFileHandle.getDiskPageId(fileId, i);
+            ICachedPage page = bufferCache.confiscatePage(dpid);
+            writer.prepareWrite(page);
+            page.getBuffer().putInt(0, i);
+            queue.put(page, HaltOnFailureCallback.INSTANCE);
+        }
+        bufferCache.finishQueue();
+        writer.endWriting();
+        bufferCache.closeFile(fileId);
+        ExecutorService executor = 
Executors.newFixedThreadPool(bufferCacheNumPages);
+        MutableObject<Thread>[] readers = new 
MutableObject[bufferCacheNumPages];
+        Future<Void>[] futures = new Future[bufferCacheNumPages];
+        for (int i = 0; i < bufferCacheNumPages; i++) {
+            readers[i] = new MutableObject<>();
+            final int threadNumber = i;
+            futures[i] = executor.submit(new Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    synchronized (readers[threadNumber]) {
+                        readers[threadNumber].setValue(Thread.currentThread());
+                        readers[threadNumber].notifyAll();
+                    }
+                    // for duration, just read the pages one by one.
+                    // At the end, close the file
+                    bufferCache.openFile(fileId);
+                    final long start = System.currentTimeMillis();
+                    int pageNumber = 0;
+                    int totalReads = 0;
+                    int successfulReads = 0;
+                    int interruptedReads = 0;
+                    while (System.currentTimeMillis() - start < duration) {
+                        totalReads++;
+                        pageNumber = (pageNumber + 1) % numPages;
+                        try {
+                            long dpid = 
BufferedFileHandle.getDiskPageId(fileId, pageNumber);
+                            ICachedPage page = bufferCache.pin(dpid, false);
+                            successfulReads++;
+                            bufferCache.unpin(page);
+                        } catch (HyracksDataException hde) {
+                            interruptedReads++;
+                            // clear
+                            Thread.interrupted();
+                        }
+                    }
+                    bufferCache.closeFile(fileId);
+                    LOGGER.log(Level.INFO, "Total reads = " + totalReads + " 
Successful Reads = " + successfulReads
+                            + " Interrupted Reads = " + interruptedReads);
+                    return null;
+                }
+            });
+        }
+
+        for (int i = 0; i < bufferCacheNumPages; i++) {
+            synchronized (readers[i]) {
+                while (readers[i].getValue() == null) {
+                    readers[i].wait();
+                }
+            }
+        }
+        final long start = System.currentTimeMillis();
+
+        while (System.currentTimeMillis() - start < duration) {
+            for (int i = 0; i < bufferCacheNumPages; i++) {
+                readers[i].getValue().interrupt();
+            }
+            Thread.sleep(25); // NOSONAR Sleep so some reads are successful
+        }
+        try {
+            for (int i = 0; i < bufferCacheNumPages; i++) {
+                futures[i].get();
+            }
+        } finally {
+            bufferCache.deleteFile(fileId);
+            bufferCache.close();
+        }
+    }
+
+    /**
+     * Compressed files are immutable.
+     */
+    @Test
+    public void simpleOpenPinCloseTest() throws HyracksException {
+        TestStorageManagerComponentHolder.init(PAGE_SIZE, NUM_PAGES, 
ACTUAL_MAX_OPEN_FILE);
+        IBufferCache bufferCache =
+                
TestStorageManagerComponentHolder.getBufferCache(ctx.getJobletContext().getServiceContext());
+
+        IIOManager ioManager = 
TestStorageManagerComponentHolder.getIOManager();
+        FileReference file = getFileReference(ioManager);
+        int fileId = bufferCache.createFile(file);
+        int num = 10;
+        int testPageId = 0;
+
+        bufferCache.openFile(fileId);
+        final ICompressedPageWriter writer = 
bufferCache.getCompressedPageWriter(fileId);
+
+        ICachedPage page = null;
+
+        // confiscating a page should succeed
+        page = 
bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId, 
testPageId));
+        writer.prepareWrite(page);
+
+        for (int i = 0; i < num; i++) {
+            page.getBuffer().putInt(i * 4, i);
+        }
+        final IFIFOPageQueue queue = bufferCache.createFIFOQueue();
+        queue.put(page, HaltOnFailureCallback.INSTANCE);
+        bufferCache.finishQueue();
+        writer.endWriting();
+        bufferCache.closeFile(fileId);
+
+        // open file again
+        bufferCache.openFile(fileId);
+
+        // tryPin should succeed because page should still be cached
+        page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, 
testPageId), false);
+        Assert.assertNotNull(page);
+        try {
+            // verify contents of page
+            for (int i = 0; i < num; i++) {
+                Assert.assertEquals(page.getBuffer().getInt(i * 4), i);
+            }
+        } finally {
+            bufferCache.unpin(page);
+        }
+
+        bufferCache.closeFile(fileId);
+        bufferCache.close();
+    }
+
+    @Test
+    public void contentCheckingMaxOpenFilesTest() throws HyracksException {
+        TestStorageManagerComponentHolder.init(PAGE_SIZE, NUM_PAGES, 
ACTUAL_MAX_OPEN_FILE);
+        IBufferCache bufferCache =
+                
TestStorageManagerComponentHolder.getBufferCache(ctx.getJobletContext().getServiceContext());
+        IIOManager ioManager = 
TestStorageManagerComponentHolder.getIOManager();
+
+        List<Integer> fileIds = new ArrayList<>();
+        Map<Integer, ArrayList<Integer>> pageContents = new HashMap<>();
+        int num = 10;
+        int testPageId = 0;
+
+        // open max number of files and write some stuff into their first page
+        for (int i = 0; i < MAX_OPEN_DATA_FILES; i++) {
+            FileReference file = getFileReference(ioManager);
+            int fileId = bufferCache.createFile(file);
+            fileIds.add(fileId);
+            bufferCache.openFile(fileId);
+            final ICompressedPageWriter writer = 
bufferCache.getCompressedPageWriter(fileId);
+            ICachedPage page = null;
+            page = 
bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId, 
testPageId));
+            writer.prepareWrite(page);
+            ArrayList<Integer> values = new ArrayList<>();
+            for (int j = 0; j < num; j++) {
+                int x = Math.abs(rnd.nextInt());
+                page.getBuffer().putInt(j * 4, x);
+                values.add(x);
+            }
+            pageContents.put(fileId, values);
+            final IFIFOPageQueue queue = bufferCache.createFIFOQueue();
+            queue.put(page, HaltOnFailureCallback.INSTANCE);
+            bufferCache.finishQueue();
+            writer.endWriting();
+        }
+
+        boolean exceptionThrown = false;
+
+        // since all files are open, next open should fail
+        try {
+            FileReference file = getFileReference(ioManager);
+            int fileId = bufferCache.createFile(file);
+            bufferCache.openFile(fileId);
+        } catch (HyracksDataException e) {
+            exceptionThrown = true;
+        }
+        Assert.assertTrue(exceptionThrown);
+
+        // close a few random files
+        ArrayList<Integer> closedFileIds = new ArrayList<>();
+        int filesToClose = 5;
+        for (int i = 0; i < filesToClose; i++) {
+            int ix = Math.abs(rnd.nextInt()) % fileIds.size();
+            bufferCache.closeFile(fileIds.get(ix));
+            closedFileIds.add(fileIds.get(ix));
+            fileIds.remove(ix);
+        }
+
+        // now open a few new files
+        for (int i = 0; i < filesToClose; i++) {
+            FileReference file = getFileReference(ioManager);
+            int fileId = bufferCache.createFile(file);
+            bufferCache.openFile(fileId);
+            fileIds.add(fileId);
+        }
+
+        // since all files are open, next open should fail
+        exceptionThrown = false;
+        try {
+            FileReference file = getFileReference(ioManager);
+            int fileId = bufferCache.createFile(file);
+            bufferCache.openFile(fileId);
+        } catch (HyracksDataException e) {
+            exceptionThrown = true;
+        }
+        Assert.assertTrue(exceptionThrown);
+
+        // close a few random files again
+        for (int i = 0; i < filesToClose; i++) {
+            int ix = Math.abs(rnd.nextInt()) % fileIds.size();
+            bufferCache.closeFile(fileIds.get(ix));
+            closedFileIds.add(fileIds.get(ix));
+            fileIds.remove(ix);
+        }
+
+        // now open those closed files again and verify their contents
+        for (int i = 0; i < filesToClose; i++) {
+            int closedFileId = closedFileIds.get(i);
+            bufferCache.openFile(closedFileId);
+            fileIds.add(closedFileId);
+
+            // pin first page and verify contents
+            ICachedPage page = null;
+            page = 
bufferCache.pin(BufferedFileHandle.getDiskPageId(closedFileId, testPageId), 
false);
+            try {
+                ArrayList<Integer> values = pageContents.get(closedFileId);
+                for (int j = 0; j < values.size(); j++) {
+                    Assert.assertEquals(values.get(j).intValue(), 
page.getBuffer().getInt(j * 4));
+                }
+            } finally {
+                bufferCache.unpin(page);
+            }
+        }
+
+        for (Integer i : fileIds) {
+            bufferCache.closeFile(i.intValue());
+        }
+
+        bufferCache.close();
+    }
+
+    @Test
+    public void interruptedConcurrentReadTest() throws Exception {
+        TestStorageManagerComponentHolder.init(PAGE_SIZE, 200, 
ACTUAL_MAX_OPEN_FILE);
+        IBufferCache bufferCache =
+                
TestStorageManagerComponentHolder.getBufferCache(ctx.getJobletContext().getServiceContext());
+        IIOManager ioManager = 
TestStorageManagerComponentHolder.getIOManager();
+        FileReference file = getFileReference(ioManager);
+        int fileId = bufferCache.createFile(file);
+        int testPageId = 0;
+        bufferCache.openFile(fileId);
+        bufferCache.getCompressedPageWriter(fileId).endWriting();
+
+        final int expectedPinCount = 100;
+        final AtomicInteger actualPinCount = new AtomicInteger(0);
+        Thread innocentReader = new Thread(() -> {
+            Thread interruptedReader = null;
+            try {
+                for (int i = 0; i < expectedPinCount; i++) {
+                    ICachedPage aPage = 
bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, testPageId), false);
+                    bufferCache.unpin(aPage);
+                    ((CachedPage) aPage).invalidate();
+                    actualPinCount.incrementAndGet();
+                    if (i % 10 == 0) {
+                        // start an interruptedReader that will cause the 
channel to closed
+                        interruptedReader = new Thread(() -> {
+                            try {
+                                Thread.currentThread().interrupt();
+                                
bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, testPageId + 1), 
false);
+                            } catch (Exception e) {
+                                e.printStackTrace();
+                            }
+                        });
+                        interruptedReader.start();
+                    }
+                }
+                if (interruptedReader != null) {
+                    interruptedReader.join();
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        });
+        innocentReader.start();
+        innocentReader.join();
+        // make sure that all reads by the innocentReader succeeded
+        Assert.assertEquals(actualPinCount.get(), expectedPinCount);
+        // close file
+        bufferCache.closeFile(fileId);
+    }
+
+    @AfterClass
+    public static void cleanup() throws Exception {
+        for (String s : openFiles) {
+            File f = new File(s);
+            f.deleteOnExit();
+        }
+    }
+
+}

Reply via email to