[NO ISSUE][STO] Fix memory leaks in storage

- user model changes: no
- storage format changes: no
- interface changes: yes
  - Added javadocs to:
  -- IBufferCache
  -- IExtraPageBlockHelper
  - Moved IBufferCache.setPageDiskId -> ICachedPage.setDiskPageId
  - Renamed:
  -- IBufferCache.flushDirtyPage -> IBufferCache.flush
  -- IBufferCache.getNumPages -> IBufferCache.getPageBudget
  - Removed:
  -- IBufferCache.adviseWontNeed [not used]
  -- IBufferCache.tryPin [not used]

details:
- Previously, when adding a kv pair to the metadata of a memory
  component, we add a new Pair item to the ArrayList. After
  this change, we only update it if it exists.
- VirtualBufferCache used to leak pages when reclaiming pages
  of a file after deletion. This has also been fixed.
- New tests for VirtualBufferCache added:
  - Checks for memory budget after end of testDisjointPins
  - Concurrent Users pinning pages concurrently
  - Test for large pages and ensuring allocated large
    pages are accounted for through removal of cached
    free pages.

Change-Id: I4ae9736c9b5fdba5795245bdf835c023e3f73b15
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2115
Tested-by: Jenkins <[email protected]>
Contrib: Jenkins <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/fbf3c0a9
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/fbf3c0a9
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/fbf3c0a9

Branch: refs/heads/master
Commit: fbf3c0a9718941122513ee2ae195e3a6f9df661e
Parents: c04046c
Author: Abdullah Alamoudi <[email protected]>
Authored: Thu Nov 2 19:48:09 2017 -0700
Committer: Michael Blow <[email protected]>
Committed: Fri Nov 3 08:23:18 2017 -0700

----------------------------------------------------------------------
 .../active/SingleThreadEventProcessor.java      |  84 -----
 .../app/active/ActiveNotificationHandler.java   |   2 +-
 .../resources/asterix-build-configuration.xml   |   2 +-
 .../app/bootstrap/TestNodeController.java       | 136 ++++++--
 .../org/apache/asterix/test/active/Actor.java   |   2 +-
 .../test/dataflow/ComponentRollbackTest.java    |  38 +--
 .../asterix/test/dataflow/LogMarkerTest.java    |  17 +-
 .../asterix/test/logging/CheckpointingTest.java |  17 +-
 .../asterix/test/storage/DiskIsFullTest.java    |  27 +-
 .../asterix/common/exceptions/ErrorCode.java    |   1 -
 .../main/resources/asx_errormsg/en.properties   |   1 -
 .../hyracks/api/exceptions/ErrorCode.java       |   4 +
 .../api/util/SingleThreadEventProcessor.java    |  83 +++++
 .../src/main/resources/errormsg/en.properties   |   4 +
 .../hyracks/data/std/api/AbstractPointable.java |  13 -
 .../apache/hyracks/data/std/util/DataUtils.java |  59 ++++
 .../hyracks/storage/am/btree/impls/BTree.java   |  10 +-
 .../AppendOnlyLinkedMetadataPageManager.java    |   8 +-
 .../freepage/LinkedMetaDataPageManager.java     |   6 +-
 .../am/common/impls/AbstractTreeIndex.java      |   5 +
 .../common/impls/AbstractLSMDiskComponent.java  |   5 +
 .../impls/AbstractLSMMemoryComponent.java       |   2 +-
 .../lsm/common/impls/DiskComponentMetadata.java |   2 +-
 .../common/impls/MemoryComponentMetadata.java   |   9 +-
 .../impls/MultitenantVirtualBufferCache.java    |  29 +-
 .../lsm/common/impls/NoMergePolicyFactory.java  |   4 +-
 .../am/lsm/common/impls/VirtualBufferCache.java | 298 +++++++++--------
 .../inmemory/InMemoryInvertedIndex.java         |   2 +-
 .../hyracks/storage/am/rtree/impls/RTree.java   |   7 +-
 .../storage/common/buffercache/BufferCache.java |  63 +---
 .../storage/common/buffercache/CachedPage.java  |  14 +-
 .../common/buffercache/DebugBufferCache.java    |  24 +-
 .../common/buffercache/FIFOLocalWriter.java     |  12 +-
 .../common/buffercache/IBufferCache.java        | 153 ++++++++-
 .../storage/common/buffercache/ICachedPage.java |   9 +
 .../buffercache/IExtraPageBlockHelper.java      |  19 ++
 .../common/buffercache/IFIFOPageQueue.java      |   3 +-
 .../storage/common/buffercache/VirtualPage.java |  10 +
 .../lsm/common/test/VirtualBufferCacheTest.java | 320 ++++++++++++++++---
 .../hyracks/storage/common/BufferCacheTest.java |  32 +-
 40 files changed, 989 insertions(+), 547 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
deleted file mode 100644
index de6682d..0000000
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.active;
-
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public abstract class SingleThreadEventProcessor<T> implements Runnable {
-
-    private static final Logger LOGGER = 
Logger.getLogger(SingleThreadEventProcessor.class.getName());
-    private final String name;
-    private final LinkedBlockingQueue<T> eventInbox;
-    private volatile Thread executorThread;
-    private volatile boolean stopped = false;
-
-    public SingleThreadEventProcessor(String threadName) {
-        this.name = threadName;
-        eventInbox = new LinkedBlockingQueue<>();
-        executorThread = new Thread(this, threadName);
-        executorThread.start();
-    }
-
-    @Override
-    public final void run() {
-        LOGGER.log(Level.INFO, "Started " + Thread.currentThread().getName());
-        while (!stopped) {
-            try {
-                T event = eventInbox.take();
-                handle(event);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-            } catch (Exception e) {
-                LOGGER.log(Level.SEVERE, "Error handling an event", e);
-            }
-        }
-        LOGGER.log(Level.WARNING, "Stopped " + 
Thread.currentThread().getName());
-    }
-
-    protected abstract void handle(T event) throws Exception; //NOSONAR
-
-    public void add(T event) {
-        if (!eventInbox.add(event)) {
-            throw new IllegalStateException();
-        }
-    }
-
-    public void stop() throws HyracksDataException, InterruptedException {
-        stopped = true;
-        executorThread.interrupt();
-        executorThread.join(1000);
-        int attempt = 0;
-        while (executorThread.isAlive()) {
-            attempt++;
-            LOGGER.log(Level.WARNING,
-                    "Failed to stop event processor after " + attempt + " 
attempts. Interrupted exception swallowed?");
-            if (attempt == 10) {
-                throw new 
RuntimeDataException(ErrorCode.FAILED_TO_SHUTDOWN_EVENT_PROCESSOR, name);
-            }
-            executorThread.interrupt();
-            executorThread.join(1000);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index cbcc44f..d13a15d 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -31,7 +31,6 @@ import org.apache.asterix.active.ActivityState;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.IActiveEntityEventsListener;
 import org.apache.asterix.active.IActiveNotificationHandler;
-import org.apache.asterix.active.SingleThreadEventProcessor;
 import org.apache.asterix.active.message.ActivePartitionMessage;
 import org.apache.asterix.common.api.IMetadataLockManager;
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -48,6 +47,7 @@ import org.apache.hyracks.api.job.IJobLifecycleListener;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.util.SingleThreadEventProcessor;
 
 public class ActiveNotificationHandler extends 
SingleThreadEventProcessor<ActiveEvent>
         implements IActiveNotificationHandler, IJobLifecycleListener {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml 
b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
index ce7eb3d..7eba9eb 100644
--- a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
+++ b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
@@ -77,7 +77,7 @@
   </property>
   <property>
     <name>storage.memorycomponent.numpages</name>
-    <value>8</value>
+    <value>16</value>
     <description>The number of pages to allocate for a memory component.
       This budget is shared by all the memory components of the primary
       index and all its secondary indexes across all I/O devices on a node.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 104f80b..b155b51 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.app.bootstrap;
 
 import java.io.File;
+import java.rmi.RemoteException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -36,20 +37,27 @@ import 
org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.context.TransactionSubsystemProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
 import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
 import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.formats.nontagged.TypeTraitProvider;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.metadata.utils.MetadataUtil;
 import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.formats.FormatUtils;
 import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
+import 
org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorNodePushable;
 import org.apache.asterix.runtime.utils.CcApplicationContext;
 import org.apache.asterix.test.runtime.ExecutionTestUtil;
 import 
org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
@@ -81,11 +89,13 @@ import 
org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescripto
 import 
org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorNodePushable;
 import org.apache.hyracks.storage.am.common.api.IIndexBuilder;
 import 
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import 
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.build.IndexBuilderFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import 
org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
 import org.apache.hyracks.storage.common.IResourceFactory;
@@ -113,9 +123,7 @@ public class TestNodeController {
     public static final double BLOOM_FILTER_FALSE_POSITIVE_RATE = 0.01;
     public static final TransactionSubsystemProvider TXN_SUBSYSTEM_PROVIDER = 
TransactionSubsystemProvider.INSTANCE;
     // Mutables
-    private JobId jobId;
     private long jobCounter = 0L;
-    private IHyracksJobletContext jobletCtx;
     private final String testConfigFileName;
     private final boolean runHDFS;
 
@@ -137,9 +145,6 @@ public class TestNodeController {
             th.printStackTrace();
             throw th;
         }
-        jobletCtx = Mockito.mock(IHyracksJobletContext.class);
-        
Mockito.when(jobletCtx.getServiceContext()).thenReturn(ExecutionTestUtil.integrationUtil.ncs[0].getContext());
-        Mockito.when(jobletCtx.getJobId()).thenReturn(jobId);
     }
 
     public void deInit() throws Exception {
@@ -147,20 +152,24 @@ public class TestNodeController {
         ExecutionTestUtil.tearDown(cleanupOnStop);
     }
 
-    public org.apache.asterix.common.transactions.JobId getTxnJobId() {
-        return new org.apache.asterix.common.transactions.JobId((int) 
jobId.getId());
+    public org.apache.asterix.common.transactions.JobId 
getTxnJobId(IHyracksTaskContext ctx) {
+        return new org.apache.asterix.common.transactions.JobId((int) 
ctx.getJobletContext().getJobId().getId());
     }
 
     public Pair<LSMInsertDeleteOperatorNodePushable, CommitRuntime> 
getInsertPipeline(IHyracksTaskContext ctx,
-            Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, 
ARecordType metaType,
-            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> 
mergePolicyProperties, int[] filterFields,
+            Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, 
ARecordType metaType, int[] filterFields,
             int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators,
-            StorageComponentProvider storageComponentProvider) throws 
AlgebricksException, HyracksDataException {
+            StorageComponentProvider storageComponentProvider)
+            throws AlgebricksException, HyracksDataException, RemoteException, 
ACIDException {
+        MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
+        
org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, 
Map<String, String>> mergePolicy =
+                DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
+        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, 
primaryKeyTypes, recordType, metaType,
-                mergePolicyFactory, mergePolicyProperties, filterFields, 
primaryKeyIndexes, primaryKeyIndicators);
+                mergePolicy.first, mergePolicy.second, filterFields, 
primaryKeyIndexes, primaryKeyIndicators);
         IndexOperation op = IndexOperation.INSERT;
         IModificationOperationCallbackFactory modOpCallbackFactory =
-                new 
PrimaryIndexModificationOperationCallbackFactory(getTxnJobId(), 
dataset.getDatasetId(),
+                new 
PrimaryIndexModificationOperationCallbackFactory(getTxnJobId(ctx), 
dataset.getDatasetId(),
                         primaryIndexInfo.primaryKeyIndexes, 
TXN_SUBSYSTEM_PROVIDER, Operation.get(op),
                         ResourceType.LSM_BTREE);
         IRecordDescriptorProvider recordDescProvider = 
primaryIndexInfo.getInsertRecordDescriptorProvider();
@@ -170,7 +179,7 @@ public class TestNodeController {
                 primaryIndexInfo.primaryIndexInsertFieldsPermutations,
                 recordDescProvider.getInputRecordDescriptor(new ActivityId(new 
OperatorDescriptorId(0), 0), 0), op,
                 true, indexHelperFactory, modOpCallbackFactory, null);
-        CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(), 
dataset.getDatasetId(),
+        CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(ctx), 
dataset.getDatasetId(),
                 primaryIndexInfo.primaryKeyIndexes, false, true, PARTITION, 
true);
         insertOp.setOutputFrameWriter(0, commitOp, primaryIndexInfo.rDesc);
         commitOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc);
@@ -204,8 +213,7 @@ public class TestNodeController {
     }
 
     public JobId newJobId() {
-        jobId = new JobId(jobCounter++);
-        return jobId;
+        return new JobId(jobCounter++);
     }
 
     public IResourceFactory getPrimaryResourceFactory(IHyracksTaskContext ctx, 
PrimaryIndexInfo primaryIndexInfo,
@@ -225,18 +233,22 @@ public class TestNodeController {
     }
 
     public PrimaryIndexInfo createPrimaryIndex(Dataset dataset, IAType[] 
primaryKeyTypes, ARecordType recordType,
-            ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, 
Map<String, String> mergePolicyProperties,
-            int[] filterFields, IStorageComponentProvider 
storageComponentProvider, int[] primaryKeyIndexes,
-            List<Integer> primaryKeyIndicators) throws AlgebricksException, 
HyracksDataException {
+            ARecordType metaType, int[] filterFields, 
IStorageComponentProvider storageComponentProvider,
+            int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators)
+            throws AlgebricksException, HyracksDataException, RemoteException, 
ACIDException {
+        MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
+        
org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, 
Map<String, String>> mergePolicy =
+                DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
+        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, 
primaryKeyTypes, recordType, metaType,
-                mergePolicyFactory, mergePolicyProperties, filterFields, 
primaryKeyIndexes, primaryKeyIndicators);
+                mergePolicy.first, mergePolicy.second, filterFields, 
primaryKeyIndexes, primaryKeyIndicators);
         Dataverse dataverse = new Dataverse(dataset.getDataverseName(), 
NonTaggedDataFormat.class.getName(),
                 MetadataUtil.PENDING_NO_OP);
         MetadataProvider mdProvider = new MetadataProvider(
                 (ICcApplicationContext) 
ExecutionTestUtil.integrationUtil.cc.getApplicationContext(), dataverse);
         try {
             IResourceFactory resourceFactory = 
dataset.getResourceFactory(mdProvider, primaryIndexInfo.index,
-                    recordType, metaType, mergePolicyFactory, 
mergePolicyProperties);
+                    recordType, metaType, mergePolicy.first, 
mergePolicy.second);
             IndexBuilderFactory indexBuilderFactory =
                     new 
IndexBuilderFactory(storageComponentProvider.getStorageManager(),
                             primaryIndexInfo.getFileSplitProvider(), 
resourceFactory, !dataset.isTemp());
@@ -283,6 +295,10 @@ public class TestNodeController {
         if (withMessaging) {
             TaskUtil.put(HyracksConstants.KEY_MESSAGE, new VSizeFrame(ctx), 
ctx);
         }
+        JobId jobId = newJobId();
+        IHyracksJobletContext jobletCtx = 
Mockito.mock(IHyracksJobletContext.class);
+        
Mockito.when(jobletCtx.getServiceContext()).thenReturn(ExecutionTestUtil.integrationUtil.ncs[0].getContext());
+        Mockito.when(jobletCtx.getJobId()).thenReturn(jobId);
         ctx = Mockito.spy(ctx);
         Mockito.when(ctx.getJobletContext()).thenReturn(jobletCtx);
         
Mockito.when(ctx.getIoManager()).thenReturn(ExecutionTestUtil.integrationUtil.ncs[0].getIoManager());
@@ -410,4 +426,84 @@ public class TestNodeController {
                 (CcApplicationContext) 
ExecutionTestUtil.integrationUtil.cc.getApplicationContext();
         return appCtx.getStorageManager();
     }
+
+    public Pair<LSMPrimaryUpsertOperatorNodePushable, CommitRuntime> 
getUpsertPipeline(IHyracksTaskContext ctx,
+            Dataset dataset, IAType[] keyTypes, ARecordType recordType, 
ARecordType metaType, int[] filterFields,
+            int[] keyIndexes, List<Integer> keyIndicators, 
StorageComponentProvider storageComponentProvider,
+            IFrameOperationCallbackFactory frameOpCallbackFactory, boolean 
hasSecondaries) throws Exception {
+        MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
+        
org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, 
Map<String, String>> mergePolicy =
+                DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
+        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, 
keyTypes, recordType, metaType,
+                mergePolicy.first, mergePolicy.second, filterFields, 
keyIndexes, keyIndicators);
+        IModificationOperationCallbackFactory modificationCallbackFactory = 
dataset.getModificationCallbackFactory(
+                storageComponentProvider, primaryIndexInfo.index, 
getTxnJobId(ctx), IndexOperation.UPSERT, keyIndexes);
+        ISearchOperationCallbackFactory searchCallbackFactory = 
dataset.getSearchCallbackFactory(
+                storageComponentProvider, primaryIndexInfo.index, 
getTxnJobId(ctx), IndexOperation.UPSERT, keyIndexes);
+        IRecordDescriptorProvider recordDescProvider = 
primaryIndexInfo.getInsertRecordDescriptorProvider();
+        IIndexDataflowHelperFactory indexHelperFactory = new 
IndexDataflowHelperFactory(
+                storageComponentProvider.getStorageManager(), 
primaryIndexInfo.getFileSplitProvider());
+        LSMPrimaryUpsertOperatorNodePushable insertOp = new 
LSMPrimaryUpsertOperatorNodePushable(ctx, PARTITION,
+                indexHelperFactory, 
primaryIndexInfo.primaryIndexInsertFieldsPermutations,
+                recordDescProvider.getInputRecordDescriptor(new ActivityId(new 
OperatorDescriptorId(0), 0), 0),
+                modificationCallbackFactory, searchCallbackFactory, 
keyIndexes.length, recordType, -1,
+                frameOpCallbackFactory == null ? 
dataset.getFrameOpCallbackFactory() : frameOpCallbackFactory,
+                MissingWriterFactory.INSTANCE, hasSecondaries);
+        RecordDescriptor upsertOutRecDesc = 
getUpsertOutRecDesc(primaryIndexInfo.rDesc, dataset,
+                filterFields == null ? 0 : filterFields.length, recordType, 
metaType);
+        // fix pk fields
+        int diff = upsertOutRecDesc.getFieldCount() - 
primaryIndexInfo.rDesc.getFieldCount();
+        int[] pkFieldsInCommitOp = new int[dataset.getPrimaryKeys().size()];
+        for (int i = 0; i < pkFieldsInCommitOp.length; i++) {
+            pkFieldsInCommitOp[i] = diff + i;
+        }
+        CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(ctx), 
dataset.getDatasetId(), pkFieldsInCommitOp,
+                false, true, PARTITION, true);
+        insertOp.setOutputFrameWriter(0, commitOp, upsertOutRecDesc);
+        commitOp.setInputRecordDescriptor(0, upsertOutRecDesc);
+        return Pair.of(insertOp, commitOp);
+    }
+
+    private RecordDescriptor getUpsertOutRecDesc(RecordDescriptor 
inputRecordDesc, Dataset dataset, int numFilterFields,
+            ARecordType itemType, ARecordType metaItemType) throws Exception {
+        ITypeTraits[] outputTypeTraits =
+                new ITypeTraits[inputRecordDesc.getFieldCount() + 
(dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+        ISerializerDeserializer<?>[] outputSerDes = new 
ISerializerDeserializer[inputRecordDesc.getFieldCount()
+                + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+
+        // add the previous record first
+        int f = 0;
+        outputSerDes[f] = 
FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType);
+        f++;
+        // add the previous meta second
+        if (dataset.hasMetaPart()) {
+            outputSerDes[f] = 
FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(metaItemType);
+            outputTypeTraits[f] = 
FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType);
+            f++;
+        }
+        // add the previous filter third
+        int fieldIdx = -1;
+        if (numFilterFields > 0) {
+            String filterField = DatasetUtil.getFilterField(dataset).get(0);
+            String[] fieldNames = itemType.getFieldNames();
+            int i = 0;
+            for (; i < fieldNames.length; i++) {
+                if (fieldNames[i].equals(filterField)) {
+                    break;
+                }
+            }
+            fieldIdx = i;
+            outputTypeTraits[f] = 
FormatUtils.getDefaultFormat().getTypeTraitProvider()
+                    .getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
+            outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider()
+                    
.getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
+            f++;
+        }
+        for (int j = 0; j < inputRecordDesc.getFieldCount(); j++) {
+            outputTypeTraits[j + f] = inputRecordDesc.getTypeTraits()[j];
+            outputSerDes[j + f] = inputRecordDesc.getFields()[j];
+        }
+        return new RecordDescriptor(outputSerDes, outputTypeTraits);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
index 8d21b55..c50a4a2 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
@@ -18,8 +18,8 @@
  */
 package org.apache.asterix.test.active;
 
-import org.apache.asterix.active.SingleThreadEventProcessor;
 import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.hyracks.api.util.SingleThreadEventProcessor;
 
 public class Actor extends SingleThreadEventProcessor<Action> {
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
index 76bec8c..36cb4bb 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -132,12 +132,12 @@ public class ComponentRollbackTest {
     public void createIndex() throws Exception {
         List<List<String>> partitioningKeys = new ArrayList<>();
         partitioningKeys.add(Collections.singletonList("key"));
-        dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, 
DATAVERSE_NAME, DATA_TYPE_NAME,
-                NODE_GROUP_NAME, null, null, new InternalDatasetDetails(null, 
PartitioningStrategy.HASH,
+        dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, 
DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
+                NoMergePolicyFactory.NAME, null, new 
InternalDatasetDetails(null, PartitioningStrategy.HASH,
                         partitioningKeys, null, null, null, false, null, 
false),
                 null, DatasetType.INTERNAL, DATASET_ID, 0);
-        PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset, 
KEY_TYPES, RECORD_TYPE, META_TYPE,
-                new NoMergePolicyFactory(), null, null, storageManager, 
KEY_INDEXES, KEY_INDICATORS_LIST);
+        PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset, 
KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+                storageManager, KEY_INDEXES, KEY_INDICATORS_LIST);
         IndexDataflowHelperFactory iHelperFactory =
                 new IndexDataflowHelperFactory(nc.getStorageManager(), 
primaryIndexInfo.getFileSplitProvider());
         ctx = nc.createTestContext(false);
@@ -146,9 +146,9 @@ public class ComponentRollbackTest {
         lsmBtree = (TestLsmBtree) indexDataflowHelper.getIndexInstance();
         indexDataflowHelper.close();
         nc.newJobId();
-        txnCtx = 
nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
-        insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, 
META_TYPE, new NoMergePolicyFactory(),
-                null, null, KEY_INDEXES, KEY_INDICATORS_LIST, 
storageManager).getLeft();
+        txnCtx = 
nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(ctx), true);
+        insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, 
META_TYPE, null, KEY_INDEXES,
+                KEY_INDICATORS_LIST, storageManager).getLeft();
     }
 
     @After
@@ -174,7 +174,7 @@ public class ComponentRollbackTest {
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
             for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
-                // flush every 1000 records
+                // flush every RECORDS_PER_COMPONENT records
                 if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != 
TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
@@ -224,7 +224,7 @@ public class ComponentRollbackTest {
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
             for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
-                // flush every 1000 records
+                // flush every RECORDS_PER_COMPONENT records
                 if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != 
TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
@@ -254,9 +254,9 @@ public class ComponentRollbackTest {
 
             // insert again
             nc.newJobId();
-            txnCtx = 
nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
-            insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, 
RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(),
-                    null, null, KEY_INDEXES, KEY_INDICATORS_LIST, 
storageManager).getLeft();
+            txnCtx = 
nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(ctx), true);
+            insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, 
RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
+                    KEY_INDICATORS_LIST, storageManager).getLeft();
             insertOp.open();
             for (int j = 0; j < RECORDS_PER_COMPONENT; j++) {
                 ITupleReference tuple = tupleGenerator.next();
@@ -291,7 +291,7 @@ public class ComponentRollbackTest {
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
             for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
-                // flush every 1000 records
+                // flush every RECORDS_PER_COMPONENT records
                 if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != 
TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
@@ -360,7 +360,7 @@ public class ComponentRollbackTest {
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
             for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
-                // flush every 1000 records
+                // flush every RECORDS_PER_COMPONENT records
                 if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != 
TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
@@ -416,7 +416,7 @@ public class ComponentRollbackTest {
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
             for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
-                // flush every 1000 records
+                // flush every RECORDS_PER_COMPONENT records
                 if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != 
TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
@@ -481,7 +481,7 @@ public class ComponentRollbackTest {
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
             for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
-                // flush every 1000 records
+                // flush every RECORDS_PER_COMPONENT records
                 if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != 
TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
@@ -541,7 +541,7 @@ public class ComponentRollbackTest {
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
             for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
-                // flush every 1000 records
+                // flush every RECORDS_PER_COMPONENT records
                 if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != 
TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
@@ -603,7 +603,7 @@ public class ComponentRollbackTest {
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
             for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
-                // flush every 1000 records
+                // flush every RECORDS_PER_COMPONENT records
                 if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != 
TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
@@ -675,7 +675,7 @@ public class ComponentRollbackTest {
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
             for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
-                // flush every 1000 records
+                // flush every RECORDS_PER_COMPONENT records
                 if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != 
TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
index a0e3aa9..82eb16a 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
@@ -110,20 +110,19 @@ public class LogMarkerTest {
             StorageComponentProvider storageManager = new 
StorageComponentProvider();
             List<List<String>> partitioningKeys = new ArrayList<>();
             partitioningKeys.add(Collections.singletonList("key"));
-            Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, 
DATAVERSE_NAME, DATA_TYPE_NAME,
-                    NODE_GROUP_NAME, null, null, new 
InternalDatasetDetails(null, PartitioningStrategy.HASH,
+            Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, 
DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
+                    NoMergePolicyFactory.NAME, null, new 
InternalDatasetDetails(null, PartitioningStrategy.HASH,
                             partitioningKeys, null, null, null, false, null, 
false),
                     null, DatasetType.INTERNAL, DATASET_ID, 0);
             try {
-                PrimaryIndexInfo indexInfo = nc.createPrimaryIndex(dataset, 
KEY_TYPES, RECORD_TYPE, META_TYPE,
-                        new NoMergePolicyFactory(), null, null, 
storageManager, KEY_INDEXES, KEY_INDICATORS_LIST);
+                PrimaryIndexInfo indexInfo = nc.createPrimaryIndex(dataset, 
KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+                        storageManager, KEY_INDEXES, KEY_INDICATORS_LIST);
                 IHyracksTaskContext ctx = nc.createTestContext(true);
                 nc.newJobId();
-                ITransactionContext txnCtx = 
nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
-                LSMInsertDeleteOperatorNodePushable insertOp = nc
-                        .getInsertPipeline(ctx, dataset, KEY_TYPES, 
RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(),
-                                null, null, KEY_INDEXES, KEY_INDICATORS_LIST, 
storageManager)
-                        .getLeft();
+                ITransactionContext txnCtx =
+                        
nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(ctx), true);
+                LSMInsertDeleteOperatorNodePushable insertOp = 
nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
+                        RECORD_TYPE, META_TYPE, null, KEY_INDEXES, 
KEY_INDICATORS_LIST, storageManager).getLeft();
                 insertOp.open();
                 TupleGenerator tupleGenerator = new 
TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
                         RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, 
META_GEN_FUNCTION, UNIQUE_META_FIELDS);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index 41b3d38..5384c92 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -119,21 +119,20 @@ public class CheckpointingTest {
             nc.init();
             List<List<String>> partitioningKeys = new ArrayList<>();
             partitioningKeys.add(Collections.singletonList("key"));
-            Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, 
DATAVERSE_NAME, DATA_TYPE_NAME,
-                    NODE_GROUP_NAME, null, null, new 
InternalDatasetDetails(null, PartitioningStrategy.HASH,
+            Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, 
DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
+                    NoMergePolicyFactory.NAME, null, new 
InternalDatasetDetails(null, PartitioningStrategy.HASH,
                             partitioningKeys, null, null, null, false, null, 
false),
                     null, DatasetType.INTERNAL, DATASET_ID, 0);
             try {
-                nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, 
META_TYPE, new NoMergePolicyFactory(), null,
-                        null, storageManager, KEY_INDEXES, KEY_INDICATOR_LIST);
+                nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, 
META_TYPE, null, storageManager, KEY_INDEXES,
+                        KEY_INDICATOR_LIST);
                 IHyracksTaskContext ctx = nc.createTestContext(false);
                 nc.newJobId();
-                ITransactionContext txnCtx = 
nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
+                ITransactionContext txnCtx =
+                        
nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(ctx), true);
                 // Prepare insert operation
-                LSMInsertDeleteOperatorNodePushable insertOp = nc
-                        .getInsertPipeline(ctx, dataset, KEY_TYPES, 
RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(),
-                                null, null, KEY_INDEXES, KEY_INDICATOR_LIST, 
storageManager)
-                        .getLeft();
+                LSMInsertDeleteOperatorNodePushable insertOp = 
nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
+                        RECORD_TYPE, META_TYPE, null, KEY_INDEXES, 
KEY_INDICATOR_LIST, storageManager).getLeft();
                 insertOp.open();
                 TupleGenerator tupleGenerator = new 
TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATOR,
                         RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, 
META_GEN_FUNCTION, UNIQUE_META_FIELDS);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
index 58697a9..eb47248 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
@@ -119,26 +119,23 @@ public class DiskIsFullTest {
             StorageComponentProvider storageManager = new 
StorageComponentProvider();
             List<List<String>> partitioningKeys = new ArrayList<>();
             partitioningKeys.add(Collections.singletonList("key"));
-            Dataset dataset =
-                    new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, 
DATA_TYPE_NAME, NODE_GROUP_NAME, null,
-                            null,
-                            new InternalDatasetDetails(null, 
PartitioningStrategy.HASH, partitioningKeys, null, null,
-                                    null, false, null, false), null, 
DatasetType.INTERNAL, DATASET_ID, 0);
+            Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, 
DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
+                    NoMergePolicyFactory.NAME, null, new 
InternalDatasetDetails(null, PartitioningStrategy.HASH,
+                            partitioningKeys, null, null, null, false, null, 
false),
+                    null, DatasetType.INTERNAL, DATASET_ID, 0);
             try {
-                nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, 
META_TYPE, new NoMergePolicyFactory(), null,
-                        null, storageManager, KEY_INDEXES, KEY_INDICATOR_LIST);
+                nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, 
META_TYPE, null, storageManager, KEY_INDEXES,
+                        KEY_INDICATOR_LIST);
                 IHyracksTaskContext ctx = nc.createTestContext(false);
                 nc.newJobId();
-                ITransactionContext txnCtx = 
nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
+                ITransactionContext txnCtx =
+                        
nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(ctx), true);
                 // Prepare insert operation
-                LSMInsertDeleteOperatorNodePushable insertOp =
-                        nc.getInsertPipeline(ctx, dataset, KEY_TYPES, 
RECORD_TYPE, META_TYPE,
-                                new NoMergePolicyFactory(), null, null, 
KEY_INDEXES, KEY_INDICATOR_LIST, storageManager)
-                                .getLeft();
+                LSMInsertDeleteOperatorNodePushable insertOp = 
nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
+                        RECORD_TYPE, META_TYPE, null, KEY_INDEXES, 
KEY_INDICATOR_LIST, storageManager).getLeft();
                 insertOp.open();
-                TupleGenerator tupleGenerator =
-                        new TupleGenerator(RECORD_TYPE, META_TYPE, 
KEY_INDEXES, KEY_INDICATOR, RECORD_GEN_FUNCTION,
-                                UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, 
UNIQUE_META_FIELDS);
+                TupleGenerator tupleGenerator = new 
TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATOR,
+                        RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, 
META_GEN_FUNCTION, UNIQUE_META_FIELDS);
                 VSizeFrame frame = new VSizeFrame(ctx);
                 FrameTupleAppender tupleAppender = new 
FrameTupleAppender(frame);
                 // Insert records until disk becomes full

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 895cd55..6d3b6c2 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -229,7 +229,6 @@ public class ErrorCode {
     public static final int ACTIVE_ENTITY_LISTENER_IS_NOT_REGISTERED = 3097;
     public static final int CANNOT_DERIGESTER_ACTIVE_ENTITY_LISTENER = 3098;
     public static final int 
DOUBLE_INITIALIZATION_OF_ACTIVE_NOTIFICATION_HANDLER = 3099;
-    public static final int FAILED_TO_SHUTDOWN_EVENT_PROCESSOR = 3100;
     public static final int DOUBLE_RECOVERY_ATTEMPTS = 3101;
     public static final int UNREPORTED_TASK_FAILURE_EXCEPTION = 3102;
     public static final int ACTIVE_ENTITY_ALREADY_SUSPENDED = 3103;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties 
b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 98b8e2f..e428721 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -218,7 +218,6 @@
 3097 = Active Entity %1$s has not been registered
 3098 = Cannot deregister %1$s because it is active
 3099 = Attempt to initialize an initialized Active Notification Handler
-3100 = Failed to shutdown event processor for %1$s
 3101 = Recovery request while recovery is currently ongoing
 3102 = Unreported exception causing task failure
 3103 = %1$s is already suspended and has state %2$s

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index a071345..42da1d7 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -133,6 +133,10 @@ public class ErrorCode {
     public static final int ILLEGAL_ATTEMPT_TO_EXIT_EMPTY_COMPONENT = 97;
     public static final int A_FLUSH_OPERATION_HAS_FAILED = 98;
     public static final int A_MERGE_OPERATION_HAS_FAILED = 99;
+    public static final int FAILED_TO_SHUTDOWN_EVENT_PROCESSOR = 100;
+    public static final int PAGE_DOES_NOT_EXIST_IN_FILE = 101;
+    public static final int VBC_ALREADY_OPEN = 102;
+    public static final int VBC_ALREADY_CLOSED = 103;
 
     // Compilation error codes.
     public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java
new file mode 100644
index 0000000..21965c7
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.util;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class SingleThreadEventProcessor<T> implements Runnable {
+
+    private static final Logger LOGGER = 
Logger.getLogger(SingleThreadEventProcessor.class.getName());
+    private final String name;
+    private final LinkedBlockingQueue<T> eventInbox;
+    private volatile Thread executorThread;
+    private volatile boolean stopped = false;
+
+    public SingleThreadEventProcessor(String threadName) {
+        this.name = threadName;
+        eventInbox = new LinkedBlockingQueue<>();
+        executorThread = new Thread(this, threadName);
+        executorThread.start();
+    }
+
+    @Override
+    public final void run() {
+        LOGGER.log(Level.INFO, "Started " + Thread.currentThread().getName());
+        while (!stopped) {
+            try {
+                T event = eventInbox.take();
+                handle(event);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            } catch (Exception e) {
+                LOGGER.log(Level.SEVERE, "Error handling an event", e);
+            }
+        }
+        LOGGER.log(Level.WARNING, "Stopped " + 
Thread.currentThread().getName());
+    }
+
+    protected abstract void handle(T event) throws Exception; //NOSONAR
+
+    public void add(T event) {
+        if (!eventInbox.add(event)) {
+            throw new IllegalStateException();
+        }
+    }
+
+    public void stop() throws HyracksDataException, InterruptedException {
+        stopped = true;
+        executorThread.interrupt();
+        executorThread.join(1000);
+        int attempt = 0;
+        while (executorThread.isAlive()) {
+            attempt++;
+            LOGGER.log(Level.WARNING,
+                    "Failed to stop event processor after " + attempt + " 
attempts. Interrupted exception swallowed?");
+            if (attempt == 10) {
+                throw 
HyracksDataException.create(ErrorCode.FAILED_TO_SHUTDOWN_EVENT_PROCESSOR, name);
+            }
+            executorThread.interrupt();
+            executorThread.join(1000);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index ae579b4..bc83f8c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -116,5 +116,9 @@
 97 = Illegal attempt to exit empty component
 98 = A flush operation has failed
 99 = A merge operation has failed
+100 = Failed to shutdown event processor for %1$s
+101 = Page %1$s does not exist in file %2$s
+102 = Failed to open virtual buffer cache since it is already open
+103 = Failed to close virtual buffer cache since it is already closed
 
 10000 = The given rule collection %1$s is not an instance of the List class.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
 
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
index 77f18ea..05417a8 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
@@ -25,19 +25,6 @@ public abstract class AbstractPointable implements 
IPointable {
 
     protected int length;
 
-    /**
-     * copies the content of this pointable to the passed byte array.
-     * the array is expected to be at least of length = length of this 
pointable
-     *
-     * @param copy
-     *            the array to write into
-     * @throws ArrayIndexOutOfBoundsException
-     *             if the passed array size is smaller than length
-     */
-    public void copyInto(byte[] copy) {
-        System.arraycopy(bytes, start, copy, 0, length);
-    }
-
     @Override
     public void set(byte[] bytes, int start, int length) {
         this.bytes = bytes;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/DataUtils.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/DataUtils.java
 
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/DataUtils.java
new file mode 100644
index 0000000..5de0b84
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/DataUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.data.std.util;
+
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public class DataUtils {
+
+    private DataUtils() {
+    }
+
+    /**
+     * Copies the content of this pointable to the passed byte array.
+     * the array is expected to be at least of length = length of this 
pointable
+     *
+     * @param value
+     *            the value to be copied
+     * @param copy
+     *            the array to write into
+     * @throws ArrayIndexOutOfBoundsException
+     *             if the passed array size is smaller than length
+     */
+    public static void copyInto(IValueReference value, byte[] copy) {
+        System.arraycopy(value.getByteArray(), value.getStartOffset(), copy, 
0, value.getLength());
+    }
+
+    /**
+     * Copies the content of this pointable to the passed byte array.
+     * the array is expected to be at least of length = offset + length of 
this pointable
+     *
+     * @param value
+     *            the value to be copied
+     * @param copy
+     *            the array to write into
+     * @param offset
+     *            the offset to start writing from
+     * @throws ArrayIndexOutOfBoundsException
+     *             if the passed array size - offset is smaller than length
+     */
+    public static void copyInto(IValueReference value, byte[] copy, int 
offset) {
+        System.arraycopy(value.getByteArray(), value.getStartOffset(), copy, 
offset, value.getLength());
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
index 5e3042e..1fbcbb0 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
@@ -1135,7 +1135,7 @@ public class BTree extends AbstractTreeIndex {
 
                 ((IBTreeInteriorFrame) interiorFrame).deleteGreatest();
                 int finalPageId = freePageManager.takePage(metaFrame);
-                bufferCache.setPageDiskId(frontier.page, 
BufferedFileHandle.getDiskPageId(getFileId(), finalPageId));
+                
frontier.page.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), 
finalPageId));
                 pagesToWrite.add(frontier.page);
                 splitKey.setLeftPage(finalPageId);
 
@@ -1156,7 +1156,7 @@ public class BTree extends AbstractTreeIndex {
             if (level < 1) {
                 ICachedPage lastLeaf = nodeFrontiers.get(level).page;
                 int lastLeafPage = nodeFrontiers.get(level).pageId;
-                setPageDpid(lastLeaf, nodeFrontiers.get(level).pageId);
+                
lastLeaf.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), 
nodeFrontiers.get(level).pageId));
                 queue.put(lastLeaf);
                 nodeFrontiers.get(level).page = null;
                 persistFrontiers(level + 1, lastLeafPage);
@@ -1171,7 +1171,7 @@ public class BTree extends AbstractTreeIndex {
             }
             ((IBTreeInteriorFrame) 
interiorFrame).setRightmostChildPageId(rightPage);
             int finalPageId = freePageManager.takePage(metaFrame);
-            setPageDpid(frontier.page, finalPageId);
+            
frontier.page.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), 
finalPageId));
             queue.put(frontier.page);
             frontier.pageId = finalPageId;
 
@@ -1193,10 +1193,6 @@ public class BTree extends AbstractTreeIndex {
         public void abort() throws HyracksDataException {
             super.handleException();
         }
-
-        private void setPageDpid(ICachedPage page, int pageId) {
-            bufferCache.setPageDiskId(page, 
BufferedFileHandle.getDiskPageId(getFileId(), pageId));
-        }
     }
 
     @SuppressWarnings("rawtypes")

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
index 1ee48f6..a051364 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
@@ -219,7 +219,7 @@ public class AppendOnlyLinkedMetadataPageManager implements 
IMetadataPageManager
                 confiscatedPage.releaseWriteLatch(false);
             }
             int finalMetaPage = getMaxPageId(metaFrame) + 1;
-            bufferCache.setPageDiskId(confiscatedPage, 
BufferedFileHandle.getDiskPageId(fileId, finalMetaPage));
+            
confiscatedPage.setDiskPageId(BufferedFileHandle.getDiskPageId(fileId, 
finalMetaPage));
             queue.put(confiscatedPage);
             bufferCache.finishQueue();
             metadataPage = getMetadataPageId();
@@ -345,8 +345,10 @@ public class AppendOnlyLinkedMetadataPageManager 
implements IMetadataPageManager
             try {
                 frame.setPage(page);
                 int inPageOffset = frame.getOffset(key);
-                return inPageOffset >= 0 ? ((long) pageId * 
bufferCache.getPageSizeWithHeader()) + frame.getOffset(key)
-                        + IBufferCache.RESERVED_HEADER_BYTES : -1L;
+                return inPageOffset >= 0
+                        ? ((long) pageId * 
bufferCache.getPageSizeWithHeader()) + frame.getOffset(key)
+                                + IBufferCache.RESERVED_HEADER_BYTES
+                        : -1L;
             } finally {
                 page.releaseReadLatch();
                 unpinPage(page);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
index 686fe78..d8afd12 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
@@ -195,7 +195,7 @@ public class LinkedMetaDataPageManager implements 
IMetadataPageManager {
             metaFrame.setMaxPage(1);
         } finally {
             metaNode.releaseWriteLatch(true);
-            bufferCache.flushDirtyPage(metaNode);
+            bufferCache.flush(metaNode);
             bufferCache.unpin(metaNode);
         }
         int rootPage = getRootPageId();
@@ -207,7 +207,7 @@ public class LinkedMetaDataPageManager implements 
IMetadataPageManager {
             leafFrame.initBuffer((byte) 0);
         } finally {
             rootNode.releaseWriteLatch(true);
-            bufferCache.flushDirtyPage(rootNode);
+            bufferCache.flush(rootNode);
             bufferCache.unpin(rootNode);
         }
     }
@@ -249,7 +249,7 @@ public class LinkedMetaDataPageManager implements 
IMetadataPageManager {
                 metaFrame.setValid(true);
             } finally {
                 metaNode.releaseWriteLatch(true);
-                bufferCache.flushDirtyPage(metaNode);
+                bufferCache.flush(metaNode);
                 bufferCache.unpin(metaNode);
                 ready = true;
             }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
index ae66402..f03a358 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
@@ -342,4 +342,9 @@ public abstract class AbstractTreeIndex implements 
ITreeIndex {
     public boolean hasMemoryComponents() {
         return true;
     }
+
+    @Override
+    public String toString() {
+        return "{\"class\":\"" + getClass().getSimpleName() + "\",\"file\":\"" 
+ file.getRelativePath() + "\"}";
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
index 280cc52..0b59e91 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
@@ -197,4 +197,9 @@ public abstract class AbstractLSMDiskComponent extends 
AbstractLSMComponent impl
                 .addBulkLoader(createIndexBulkLoader(fillFactor, verifyInput, 
numElementsHint, checkIfEmptyIndex));
         return chainedBulkLoader;
     }
+
+    @Override
+    public String toString() {
+        return "{\"class\":" + getClass().getSimpleName() + "\", \"index\":" + 
getIndex().toString() + "}";
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
index a4fe35c..57db635 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
@@ -255,6 +255,6 @@ public abstract class AbstractLSMMemoryComponent extends 
AbstractLSMComponent im
     @Override
     public long getSize() {
         IBufferCache virtualBufferCache = getIndex().getBufferCache();
-        return virtualBufferCache.getNumPages() * (long) 
virtualBufferCache.getPageSize();
+        return virtualBufferCache.getPageBudget() * (long) 
virtualBufferCache.getPageSize();
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java
index b7d2ea3..d1244ce 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java
@@ -45,7 +45,7 @@ public class DiskComponentMetadata implements 
IComponentMetadata {
 
     @Override
     public IValueReference get(IValueReference key) throws 
HyracksDataException {
-        IPointable value = VoidPointable.FACTORY.createPointable();
+        VoidPointable value = VoidPointable.FACTORY.createPointable();
         get(key, value);
         return value;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java
index dcc9355..1b827b7 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java
@@ -20,6 +20,8 @@ package org.apache.hyracks.storage.am.lsm.common.impls;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -31,6 +33,7 @@ import 
org.apache.hyracks.storage.am.common.api.ITreeIndexMetadataFrame;
 import org.apache.hyracks.storage.am.lsm.common.api.IComponentMetadata;
 
 public class MemoryComponentMetadata implements IComponentMetadata {
+    private static final Logger LOGGER = 
Logger.getLogger(MemoryComponentMetadata.class.getName());
     private static final byte[] empty = new byte[0];
     private final List<org.apache.commons.lang3.tuple.Pair<IValueReference, 
ArrayBackedValueStorage>> store =
             new ArrayList<>();
@@ -43,9 +46,9 @@ public class MemoryComponentMetadata implements 
IComponentMetadata {
         ArrayBackedValueStorage stored = get(key);
         if (stored == null) {
             stored = new ArrayBackedValueStorage();
+            store.add(Pair.of(key, stored));
         }
         stored.assign(value);
-        store.add(Pair.of(key, stored));
     }
 
     /**
@@ -71,8 +74,12 @@ public class MemoryComponentMetadata implements 
IComponentMetadata {
     }
 
     public void copy(IMetadataPageManager mdpManager) throws 
HyracksDataException {
+        LOGGER.log(Level.INFO, "Copying Metadata into a different component");
         ITreeIndexMetadataFrame frame = mdpManager.createMetadataFrame();
         for (Pair<IValueReference, ArrayBackedValueStorage> pair : store) {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.log(Level.INFO, "Copying " + pair.getKey() + " : " + 
pair.getValue().getLength() + " bytes");
+            }
             mdpManager.put(frame, pair.getKey(), pair.getValue());
         }
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
index 83e140c..7a3d58b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
@@ -20,8 +20,6 @@ package org.apache.hyracks.storage.am.lsm.common.impls;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
@@ -34,7 +32,6 @@ import org.apache.hyracks.storage.common.file.IFileMapManager;
 import org.apache.hyracks.util.JSONUtil;
 
 public class MultitenantVirtualBufferCache implements IVirtualBufferCache {
-    private static final Logger LOGGER = 
Logger.getLogger(ExternalIndexHarness.class.getName());
 
     private final IVirtualBufferCache vbc;
     private int openCount;
@@ -65,11 +62,6 @@ public class MultitenantVirtualBufferCache implements 
IVirtualBufferCache {
     }
 
     @Override
-    public ICachedPage tryPin(long dpid) throws HyracksDataException {
-        return vbc.tryPin(dpid);
-    }
-
-    @Override
     public ICachedPage pin(long dpid, boolean newPage) throws 
HyracksDataException {
         return vbc.pin(dpid, newPage);
     }
@@ -80,8 +72,8 @@ public class MultitenantVirtualBufferCache implements 
IVirtualBufferCache {
     }
 
     @Override
-    public void flushDirtyPage(ICachedPage page) throws HyracksDataException {
-        vbc.flushDirtyPage(page);
+    public void flush(ICachedPage page) throws HyracksDataException {
+        vbc.flush(page);
     }
 
     @Override
@@ -100,8 +92,8 @@ public class MultitenantVirtualBufferCache implements 
IVirtualBufferCache {
     }
 
     @Override
-    public int getNumPages() {
-        return vbc.getNumPages();
+    public int getPageBudget() {
+        return vbc.getPageBudget();
     }
 
     @Override
@@ -141,14 +133,6 @@ public class MultitenantVirtualBufferCache implements 
IVirtualBufferCache {
     }
 
     @Override
-    public void adviseWontNeed(ICachedPage page) {
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.log(Level.INFO, "Calling adviseWontNeed on " + 
this.getClass().getName()
-                    + " makes no sense as this BufferCache cannot evict 
pages");
-        }
-    }
-
-    @Override
     public ICachedPage confiscatePage(long dpid) throws HyracksDataException {
         return vbc.confiscatePage(dpid);
     }
@@ -175,11 +159,6 @@ public class MultitenantVirtualBufferCache implements 
IVirtualBufferCache {
     }
 
     @Override
-    public void setPageDiskId(ICachedPage page, long dpid) {
-
-    }
-
-    @Override
     public void returnPage(ICachedPage page, boolean reinsert) {
 
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicyFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicyFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicyFactory.java
index 3195f57..8ce636b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicyFactory.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicyFactory.java
@@ -31,13 +31,13 @@ import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 public class NoMergePolicyFactory implements ILSMMergePolicyFactory {
 
     private static final long serialVersionUID = 1L;
-
     private static final String[] SET_VALUES = new String[] {};
     private static final Set<String> PROPERTIES_NAMES = new 
HashSet<>(Arrays.asList(SET_VALUES));
+    public static final String NAME = "no-merge";
 
     @Override
     public String getName() {
-        return "no-merge";
+        return NAME;
     }
 
     @Override

Reply via email to