This is an automated email from the ASF dual-hosted git repository.

alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 631846b  [ASTERIXDB-2552][RT] Reuse objects on reseting buffer manager
631846b is described below

commit 631846bfb6f6e8546ac6a544afbd071a6b724344
Author: Ali Alsuliman <[email protected]>
AuthorDate: Wed May 1 20:04:36 2019 -0700

    [ASTERIXDB-2552][RT] Reuse objects on reseting buffer manager
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    The "VariableFrameMemoryManager" releases the physical frames
    objects, the logical frames objects and the free slots objects
    on reset(). This patch is to retain those objects and only
    release them on close() (as also stated in the interface).
    
    - added close() to IFrameFreeSlotPolicy to make the interface similar
    to the other interfaces (IFrameBufferManager, IFramePool, ... etc)
    where close() is supposed to release while reset() is supposed to only
    reset but retain resources.
    - renamed "InMemorySortRuntimeFactory" to "MicroSortRuntimeFactory".
    
    Change-Id: I88bea27e5024c621412ef609475e2a7ba1913afa
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/3372
    Contrib: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Integration-Tests: Jenkins <[email protected]>
    Reviewed-by: Dmitry Lychagin <[email protected]>
    Reviewed-by: Till Westmann <[email protected]>
---
 .../physical/MicroStableSortPOperator.java         |  4 +-
 ...meFactory.java => MicroSortRuntimeFactory.java} |  6 +--
 .../tests/pushruntime/PushRuntimeTest.java         | 10 ++---
 .../buffermanager/FrameFreeSlotBiggestFirst.java   |  6 +++
 .../std/buffermanager/FrameFreeSlotLastFit.java    | 20 ++++++---
 .../buffermanager/FrameFreeSlotSmallestFit.java    |  8 +++-
 .../std/buffermanager/IFrameBufferManager.java     |  9 ++--
 .../std/buffermanager/IFrameFreeSlotPolicy.java    | 16 ++++---
 .../VariableDeletableTupleMemoryManager.java       |  2 +-
 .../buffermanager/VariableFrameMemoryManager.java  | 52 +++++++++++++++-------
 10 files changed, 90 insertions(+), 43 deletions(-)

diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java
index c6a7d9d..413c1a4 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java
@@ -32,7 +32,7 @@ import 
org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
 import 
org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import 
org.apache.hyracks.algebricks.runtime.operators.sort.InMemorySortRuntimeFactory;
+import 
org.apache.hyracks.algebricks.runtime.operators.sort.MicroSortRuntimeFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -80,7 +80,7 @@ public class MicroStableSortPOperator extends 
AbstractStableSortPOperator {
             i++;
         }
 
-        IPushRuntimeFactory runtime = new 
InMemorySortRuntimeFactory(sortFields, nkcf, comps, null, maxNumberOfFrames);
+        IPushRuntimeFactory runtime = new MicroSortRuntimeFactory(sortFields, 
nkcf, comps, null, maxNumberOfFrames);
         builder.contributeMicroOperator(op, runtime, recDescriptor);
         ILogicalOperator src = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, op, 0);
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/MicroSortRuntimeFactory.java
similarity index 96%
rename from 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
rename to 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/MicroSortRuntimeFactory.java
index b34b7b9..ad44c90 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/MicroSortRuntimeFactory.java
@@ -39,7 +39,7 @@ import org.apache.hyracks.dataflow.std.sort.Algorithm;
 import org.apache.hyracks.dataflow.std.sort.ExternalSortRunGenerator;
 import org.apache.hyracks.dataflow.std.sort.ExternalSortRunMerger;
 
-public class InMemorySortRuntimeFactory extends 
AbstractOneInputOneOutputRuntimeFactory {
+public class MicroSortRuntimeFactory extends 
AbstractOneInputOneOutputRuntimeFactory {
 
     private static final long serialVersionUID = 1L;
     private final int framesLimit;
@@ -47,14 +47,14 @@ public class InMemorySortRuntimeFactory extends 
AbstractOneInputOneOutputRuntime
     private final INormalizedKeyComputerFactory[] keyNormalizerFactories;
     private final IBinaryComparatorFactory[] comparatorFactories;
 
-    public InMemorySortRuntimeFactory(int[] sortFields, 
INormalizedKeyComputerFactory firstKeyNormalizerFactory,
+    public MicroSortRuntimeFactory(int[] sortFields, 
INormalizedKeyComputerFactory firstKeyNormalizerFactory,
             IBinaryComparatorFactory[] comparatorFactories, int[] 
projectionList, int framesLimit) {
         this(sortFields, firstKeyNormalizerFactory != null
                 ? new INormalizedKeyComputerFactory[] { 
firstKeyNormalizerFactory } : null, comparatorFactories,
                 projectionList, framesLimit);
     }
 
-    public InMemorySortRuntimeFactory(int[] sortFields, 
INormalizedKeyComputerFactory[] keyNormalizerFactories,
+    public MicroSortRuntimeFactory(int[] sortFields, 
INormalizedKeyComputerFactory[] keyNormalizerFactories,
             IBinaryComparatorFactory[] comparatorFactories, int[] 
projectionList, int framesLimit) {
         super(projectionList);
         // Obs: the projection list is currently ignored.
diff --git 
a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
 
b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
index e9b3fc3..aeb22b6 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
@@ -47,7 +47,7 @@ import 
org.apache.hyracks.algebricks.runtime.operators.aggrun.RunningAggregateRu
 import 
org.apache.hyracks.algebricks.runtime.operators.group.MicroPreClusteredGroupRuntimeFactory;
 import 
org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
 import 
org.apache.hyracks.algebricks.runtime.operators.meta.SubplanRuntimeFactory;
-import 
org.apache.hyracks.algebricks.runtime.operators.sort.InMemorySortRuntimeFactory;
+import 
org.apache.hyracks.algebricks.runtime.operators.sort.MicroSortRuntimeFactory;
 import 
org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
 import 
org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
 import 
org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory;
@@ -718,8 +718,8 @@ public class PushRuntimeTest {
                 new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
 
         // the algebricks op.
-        InMemorySortRuntimeFactory sort =
-                new InMemorySortRuntimeFactory(new int[] { 1 }, 
(INormalizedKeyComputerFactory) null,
+        MicroSortRuntimeFactory sort =
+                new MicroSortRuntimeFactory(new int[] { 1 }, 
(INormalizedKeyComputerFactory) null,
                         new IBinaryComparatorFactory[] { 
UTF8StringBinaryComparatorFactory.INSTANCE }, null, 50);
         RecordDescriptor sortDesc = scannerDesc;
 
@@ -834,8 +834,8 @@ public class PushRuntimeTest {
 
         // the sort (by nation id)
         RecordDescriptor sortDesc = scannerDesc;
-        InMemorySortRuntimeFactory sort =
-                new InMemorySortRuntimeFactory(new int[] { 3 }, 
(INormalizedKeyComputerFactory) null,
+        MicroSortRuntimeFactory sort =
+                new MicroSortRuntimeFactory(new int[] { 3 }, 
(INormalizedKeyComputerFactory) null,
                         new IBinaryComparatorFactory[] { 
IntegerBinaryComparatorFactory.INSTANCE }, null, 50);
 
         // the group-by
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotBiggestFirst.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotBiggestFirst.java
index 6f5587f..8c81dba 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotBiggestFirst.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotBiggestFirst.java
@@ -101,6 +101,12 @@ class FrameFreeSlotBiggestFirst implements 
IFrameFreeSlotPolicy {
 
     @Override
     public void reset() {
+        // TODO(ali): fix to not release resources
+        heap.reset();
+    }
+
+    @Override
+    public void close() {
         heap.reset();
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotLastFit.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotLastFit.java
index 819ff80..9afe6d1 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotLastFit.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotLastFit.java
@@ -38,15 +38,17 @@ class FrameFreeSlotLastFit implements IFrameFreeSlotPolicy {
         }
     }
 
+    private final int initialNumFrames;
     private FrameSpace[] frameSpaces;
     private int size;
 
-    public FrameFreeSlotLastFit(int initialFrameNumber) {
-        frameSpaces = new FrameSpace[initialFrameNumber];
+    FrameFreeSlotLastFit(int initialFrameNumber) {
+        initialNumFrames = initialFrameNumber;
+        frameSpaces = new FrameSpace[initialNumFrames];
         size = 0;
     }
 
-    public FrameFreeSlotLastFit() {
+    FrameFreeSlotLastFit() {
         this(INITIAL_CAPACITY);
     }
 
@@ -65,6 +67,9 @@ class FrameFreeSlotLastFit implements IFrameFreeSlotPolicy {
 
     @Override
     public void pushNewFrame(int frameID, int freeSpace) {
+        if (frameSpaces == null) {
+            frameSpaces = new FrameSpace[initialNumFrames];
+        }
         if (size >= frameSpaces.length) {
             frameSpaces = Arrays.copyOf(frameSpaces, size * 2);
         }
@@ -78,8 +83,11 @@ class FrameFreeSlotLastFit implements IFrameFreeSlotPolicy {
     @Override
     public void reset() {
         size = 0;
-        for (int i = frameSpaces.length - 1; i >= 0; i--) {
-            frameSpaces[i] = null;
-        }
+    }
+
+    @Override
+    public void close() {
+        size = 0;
+        frameSpaces = null;
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotSmallestFit.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotSmallestFit.java
index ada6752..77518ec 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotSmallestFit.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotSmallestFit.java
@@ -27,7 +27,7 @@ class FrameFreeSlotSmallestFit implements 
IFrameFreeSlotPolicy {
 
     private TreeMap<Integer, LinkedList<Integer>> freeSpaceIndex;
 
-    public FrameFreeSlotSmallestFit() {
+    FrameFreeSlotSmallestFit() {
         freeSpaceIndex = new TreeMap<>();
     }
 
@@ -58,6 +58,12 @@ class FrameFreeSlotSmallestFit implements 
IFrameFreeSlotPolicy {
 
     @Override
     public void reset() {
+        // TODO(ali): fix to not release resources
+        freeSpaceIndex.clear();
+    }
+
+    @Override
+    public void close() {
         freeSpaceIndex.clear();
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFrameBufferManager.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFrameBufferManager.java
index 1118bf3..8b73b54 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFrameBufferManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFrameBufferManager.java
@@ -23,19 +23,19 @@ import java.nio.ByteBuffer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 /**
- * Manage the buffer space in the unit of frame.
+ * Manages the buffer space in the unit of frame.
  */
 public interface IFrameBufferManager {
 
     /**
-     * Reset the counters and flags to initial status. This method should not 
release the pre-allocated resources
+     * Resets the counters and flags to initial status. This method should not 
release the pre-allocated resources
      *
      * @throws org.apache.hyracks.api.exceptions.HyracksDataException
      */
     void reset() throws HyracksDataException;
 
     /**
-     * @param frameIndex
+     * @param frameIndex index of frame requested
      * @param bufferInfo the given object need to be reset
      * @return the filled bufferInfo to facilitate the chain access
      */
@@ -54,6 +54,9 @@ public interface IFrameBufferManager {
      */
     int insertFrame(ByteBuffer frame) throws HyracksDataException;
 
+    /**
+     * Releases the allocated resources.
+     */
     void close();
 
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFrameFreeSlotPolicy.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFrameFreeSlotPolicy.java
index 8a1e004..5679844 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFrameFreeSlotPolicy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFrameFreeSlotPolicy.java
@@ -22,8 +22,8 @@ package org.apache.hyracks.dataflow.std.buffermanager;
 public interface IFrameFreeSlotPolicy {
 
     /**
-     * Find the best fit frame id which can hold the data, and then pop it out 
from the index.
-     * Return -1 is failed to find any.
+     * Finds the best fit frame id which can hold the data, and then pops it 
out from the index.
+     * Returns -1 if failed to find any.
      *
      * @param tobeInsertedSize the actual size of the data which should include
      *                         the meta data like the field offset and the 
tuple
@@ -33,16 +33,20 @@ public interface IFrameFreeSlotPolicy {
     int popBestFit(int tobeInsertedSize);
 
     /**
-     * Register the new free slot into the index
+     * Registers the new free slot into the index.
      *
-     * @param frameID
-     * @param freeSpace
+     * @param frameID frame id
+     * @param freeSpace how much free space exists in this frame
      */
     void pushNewFrame(int frameID, int freeSpace);
 
     /**
-     * Clear all the existing free slot information.
+     * Clears all the existing free slot information. This method should not 
release the allocated resources.
      */
     void reset();
 
+    /**
+     * Releases the allocated resources.
+     */
+    void close();
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java
index 6c67ecc..33d0fda 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java
@@ -158,7 +158,7 @@ public class VariableDeletableTupleMemoryManager implements 
IDeletableTupleBuffe
     @Override
     public void close() {
         pool.close();
-        policy.reset();
+        policy.close();
         frames.clear();
         numTuples = 0;
         if (LOG.isDebugEnabled()) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFrameMemoryManager.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFrameMemoryManager.java
index 6604ba8..50db2ec 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFrameMemoryManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFrameMemoryManager.java
@@ -37,18 +37,23 @@ public class VariableFrameMemoryManager implements 
IFrameBufferManager {
             physicalFrame = frame;
             physicalOffset = offset;
         }
+
+        void reset(ByteBuffer frame, int offset) {
+            physicalFrame = frame;
+            physicalOffset = offset;
+        }
     }
 
     private final IFramePool framePool;
-    private List<PhysicalFrameOffset> physicalFrameOffsets;
-    private List<BufferInfo> logicalFrameStartSizes;
     private final IFrameFreeSlotPolicy freeSlotPolicy;
+    private final List<PhysicalFrameOffset> physicalFrames = new ArrayList<>();
+    private final List<BufferInfo> logicalFrames = new ArrayList<>();
+    private int numPhysicalFrames = 0;
+    private int numLogicalFrames = 0;
 
     public VariableFrameMemoryManager(IFramePool framePool, 
IFrameFreeSlotPolicy freeSlotPolicy) {
         this.framePool = framePool;
         this.freeSlotPolicy = freeSlotPolicy;
-        this.physicalFrameOffsets = new ArrayList<>();
-        this.logicalFrameStartSizes = new ArrayList<>();
     }
 
     private int findAvailableFrame(int frameSize) throws HyracksDataException {
@@ -59,29 +64,37 @@ public class VariableFrameMemoryManager implements 
IFrameBufferManager {
         ByteBuffer buffer = framePool.allocateFrame(frameSize);
         if (buffer != null) {
             IntSerDeUtils.putInt(buffer.array(), 
FrameHelper.getTupleCountOffset(buffer.capacity()), 0);
-            physicalFrameOffsets.add(new PhysicalFrameOffset(buffer, 0));
-            return physicalFrameOffsets.size() - 1;
+            if (numPhysicalFrames < physicalFrames.size()) {
+                physicalFrames.get(numPhysicalFrames).reset(buffer, 0);
+            } else {
+                physicalFrames.add(new PhysicalFrameOffset(buffer, 0));
+            }
+            numPhysicalFrames++;
+            return numPhysicalFrames - 1; // returns the index of the physical 
frame appended
         }
         return -1;
     }
 
     @Override
     public void reset() throws HyracksDataException {
-        physicalFrameOffsets.clear();
-        logicalFrameStartSizes.clear();
+        numPhysicalFrames = 0;
+        numLogicalFrames = 0;
         freeSlotPolicy.reset();
         framePool.reset();
     }
 
     @Override
     public BufferInfo getFrame(int frameIndex, BufferInfo info) {
-        info.reset(logicalFrameStartSizes.get(frameIndex));
+        if (frameIndex >= numLogicalFrames) {
+            throw new IndexOutOfBoundsException();
+        }
+        info.reset(logicalFrames.get(frameIndex));
         return info;
     }
 
     @Override
     public int getNumFrames() {
-        return logicalFrameStartSizes.size();
+        return numLogicalFrames;
     }
 
     @Override
@@ -91,7 +104,7 @@ public class VariableFrameMemoryManager implements 
IFrameBufferManager {
         if (physicalFrameId < 0) {
             return -1;
         }
-        PhysicalFrameOffset frameOffset = 
physicalFrameOffsets.get(physicalFrameId);
+        PhysicalFrameOffset frameOffset = physicalFrames.get(physicalFrameId);
         ByteBuffer buffer = frameOffset.physicalFrame;
         int offset = frameOffset.physicalOffset;
         System.arraycopy(frame.array(), 0, buffer.array(), offset, frameSize);
@@ -99,15 +112,22 @@ public class VariableFrameMemoryManager implements 
IFrameBufferManager {
             freeSlotPolicy.pushNewFrame(physicalFrameId, buffer.capacity() - 
offset - frameSize);
         }
         frameOffset.physicalOffset = offset + frameSize;
-        logicalFrameStartSizes.add(new BufferInfo(buffer, offset, frameSize));
-        return logicalFrameStartSizes.size() - 1;
+        if (numLogicalFrames < logicalFrames.size()) {
+            logicalFrames.get(numLogicalFrames).reset(buffer, offset, 
frameSize);
+        } else {
+            logicalFrames.add(new BufferInfo(buffer, offset, frameSize));
+        }
+        numLogicalFrames++;
+        return numLogicalFrames - 1; // returns the index of the logical frame 
appended
     }
 
     @Override
     public void close() {
-        physicalFrameOffsets.clear();
-        logicalFrameStartSizes.clear();
-        freeSlotPolicy.reset();
+        numPhysicalFrames = 0;
+        numLogicalFrames = 0;
+        physicalFrames.clear();
+        logicalFrames.clear();
+        freeSlotPolicy.close();
         framePool.close();
     }
 }

Reply via email to