Repository: asterixdb Updated Branches: refs/heads/master 833f1f2d6 -> d810df797
Removed some new byte[] creation and implemented explicit resource release - Removed some new byte[] creation (mainly for toByteArray() method) - Implemented Explicit resource release during a hash join - Refactorered Hash-join code to remove repetitive same condition check Change-Id: I55195696a3db09c14b8debdd78f5f68d701b9129 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1378 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Ian Maxon <ima...@apache.org> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/d810df79 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/d810df79 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/d810df79 Branch: refs/heads/master Commit: d810df797ea3c7960e1983f634278f0b7929c7bd Parents: 833f1f2 Author: Taewoo Kim <wangs...@yahoo.com> Authored: Thu Dec 8 15:51:57 2016 -0800 Committer: Taewoo Kim <wangs...@yahoo.com> Committed: Thu Dec 8 17:55:52 2016 -0800 ---------------------------------------------------------------------- .../lang/aql/util/AQLFormatPrintUtil.java | 4 +- .../lang/sqlpp/util/SqlppAstPrintUtil.java | 2 +- .../lang/sqlpp/util/SqlppFormatPrintUtil.java | 2 +- .../om/pointables/cast/ARecordCaster.java | 7 +-- .../functions/ReplicationProtocol.java | 22 ++++----- .../std/util/ExtendedByteArrayOutputStream.java | 47 ++++++++++++++++++++ .../buffermanager/DeallocatableFramePool.java | 7 ++- .../VPartitionTupleBufferManager.java | 5 +++ .../dataflow/std/join/InMemoryHashJoin.java | 42 ++++++++++++----- .../std/join/OptimizedHybridHashJoin.java | 10 +++-- ...timizedHybridHashJoinOperatorDescriptor.java | 1 + .../std/structures/SerializableHashTable.java | 11 +++-- 12 files changed, 121 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d810df79/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/AQLFormatPrintUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/AQLFormatPrintUtil.java b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/AQLFormatPrintUtil.java index c97e642..bd5ea2d 100644 --- a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/AQLFormatPrintUtil.java +++ b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/AQLFormatPrintUtil.java @@ -50,7 +50,7 @@ public class AQLFormatPrintUtil { expr.accept(visitor, 0); } output.close(); - return new String(bos.toByteArray()); + return bos.toString(); } public static String toSQLPPString(List<Statement> exprs) throws AsterixException { @@ -61,6 +61,6 @@ public class AQLFormatPrintUtil { expr.accept(visitor, 0); } output.close(); - return new String(bos.toByteArray()); + return bos.toString(); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d810df79/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppAstPrintUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppAstPrintUtil.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppAstPrintUtil.java index f700439..0d840cd 100644 --- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppAstPrintUtil.java +++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppAstPrintUtil.java @@ -96,7 +96,7 @@ public class SqlppAstPrintUtil { expr.accept(visitor, 0); } output.close(); - return new String(bos.toByteArray()); + return bos.toString(); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d810df79/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppFormatPrintUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppFormatPrintUtil.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppFormatPrintUtil.java index ff0a8e1..2c8f582 100644 --- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppFormatPrintUtil.java +++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppFormatPrintUtil.java @@ -90,7 +90,7 @@ public class SqlppFormatPrintUtil { expr.accept(visitor, 0); } output.close(); - return new String(bos.toByteArray()); + return bos.toString(); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d810df79/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ARecordCaster.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ARecordCaster.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ARecordCaster.java index 7e1fe46..d18b4d1 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ARecordCaster.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ARecordCaster.java @@ -269,11 +269,8 @@ class ARecordCaster { .deserialize(fieldType.getByteArray()[fieldType.getStartOffset()]); ps.print(typeTag); - //collect the output message - byte[] output = fieldBos.toByteArray(); - - //throw the exception - throw new IllegalStateException("type mismatch: including an extra field " + new String(output)); + //collect the output message and throw the exception + throw new IllegalStateException("type mismatch: including an extra field " + fieldBos.toString()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d810df79/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java index 608e442..8a52529 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java @@ -19,7 +19,6 @@ package org.apache.asterix.replication.functions; import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -30,6 +29,7 @@ import org.apache.asterix.common.replication.ReplicaEvent; import org.apache.asterix.replication.management.NetworkingUtil; import org.apache.asterix.replication.storage.LSMComponentProperties; import org.apache.asterix.replication.storage.LSMIndexFileProperties; +import org.apache.hyracks.data.std.util.ExtendedByteArrayOutputStream; public class ReplicationProtocol { @@ -84,7 +84,7 @@ public class ReplicationProtocol { public static ByteBuffer writeLSMComponentPropertiesRequest(LSMComponentProperties lsmCompProp, ByteBuffer buffer) throws IOException { - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + ExtendedByteArrayOutputStream outputStream = new ExtendedByteArrayOutputStream(); try (DataOutputStream oos = new DataOutputStream(outputStream)) { lsmCompProp.serialize(oos); int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size(); @@ -95,7 +95,7 @@ public class ReplicationProtocol { } buffer.putInt(ReplicationRequestType.LSM_COMPONENT_PROPERTIES.ordinal()); buffer.putInt(oos.size()); - buffer.put(outputStream.toByteArray()); + buffer.put(outputStream.getByteArray(), 0, outputStream.getLength()); buffer.flip(); return buffer; } @@ -132,7 +132,7 @@ public class ReplicationProtocol { public static ByteBuffer writeFileReplicationRequest(ByteBuffer requestBuffer, LSMIndexFileProperties afp, ReplicationRequestType requestType) throws IOException { - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + ExtendedByteArrayOutputStream outputStream = new ExtendedByteArrayOutputStream(); try (DataOutputStream oos = new DataOutputStream(outputStream)) { afp.serialize(oos); int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size(); @@ -143,7 +143,7 @@ public class ReplicationProtocol { } requestBuffer.putInt(requestType.ordinal()); requestBuffer.putInt(oos.size()); - requestBuffer.put(outputStream.toByteArray()); + requestBuffer.put(outputStream.getByteArray(), 0, outputStream.getLength()); requestBuffer.flip(); return requestBuffer; } @@ -156,13 +156,13 @@ public class ReplicationProtocol { } public static ByteBuffer writeReplicaEventRequest(ReplicaEvent event) throws IOException { - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + ExtendedByteArrayOutputStream outputStream = new ExtendedByteArrayOutputStream(); try (DataOutputStream oos = new DataOutputStream(outputStream)) { event.serialize(oos); ByteBuffer buffer = ByteBuffer.allocate(REPLICATION_REQUEST_HEADER_SIZE + oos.size()); buffer.putInt(ReplicationRequestType.REPLICA_EVENT.ordinal()); buffer.putInt(oos.size()); - buffer.put(outputStream.toByteArray()); + buffer.put(outputStream.getByteArray(), 0, outputStream.getLength()); buffer.flip(); return buffer; } @@ -177,7 +177,7 @@ public class ReplicationProtocol { public static ByteBuffer writeGetReplicaFilesRequest(ByteBuffer buffer, ReplicaFilesRequest request) throws IOException { - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + ExtendedByteArrayOutputStream outputStream = new ExtendedByteArrayOutputStream(); try (DataOutputStream oos = new DataOutputStream(outputStream)) { request.serialize(oos); @@ -189,7 +189,7 @@ public class ReplicationProtocol { } buffer.putInt(ReplicationRequestType.GET_REPLICA_FILES.ordinal()); buffer.putInt(oos.size()); - buffer.put(outputStream.toByteArray()); + buffer.put(outputStream.getByteArray(), 0, outputStream.getLength()); buffer.flip(); return buffer; } @@ -197,7 +197,7 @@ public class ReplicationProtocol { public static ByteBuffer writeGetReplicaIndexFlushRequest(ByteBuffer buffer, ReplicaIndexFlushRequest request) throws IOException { - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + ExtendedByteArrayOutputStream outputStream = new ExtendedByteArrayOutputStream(); try (DataOutputStream oos = new DataOutputStream(outputStream)) { request.serialize(oos); int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size(); @@ -208,7 +208,7 @@ public class ReplicationProtocol { } buffer.putInt(ReplicationRequestType.FLUSH_INDEX.ordinal()); buffer.putInt(oos.size()); - buffer.put(outputStream.toByteArray()); + buffer.put(outputStream.getByteArray(), 0, outputStream.getLength()); buffer.flip(); return buffer; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d810df79/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ExtendedByteArrayOutputStream.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ExtendedByteArrayOutputStream.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ExtendedByteArrayOutputStream.java new file mode 100644 index 0000000..59c7786 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ExtendedByteArrayOutputStream.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.data.std.util; + +import java.io.ByteArrayOutputStream; + +/** + * This is an extended class of ByteArrayOutputStream class that can return the current buffer array and its length. + * Use this class to avoid a new byte[] creation when using toArray() method. + */ +public class ExtendedByteArrayOutputStream extends ByteArrayOutputStream { + + public ExtendedByteArrayOutputStream() { + super(); + } + + public ExtendedByteArrayOutputStream(int size) { + super(size); + } + + public synchronized byte[] getByteArray() { + return buf; + } + + /** + * Returns the current length of this stream (not capacity). + */ + public synchronized int getLength() { + return count; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d810df79/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java index 626edba..4499e32 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java @@ -117,7 +117,12 @@ public class DeallocatableFramePool implements IDeallocatableFramePool { @Override public void close() { - buffers.clear(); + for (Iterator<ByteBuffer> iter = buffers.iterator(); iter.hasNext();) { + ByteBuffer next = iter.next(); + ctx.deallocateFrames(next.capacity()); + iter.remove(); + } allocated = 0; + buffers.clear(); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d810df79/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java index 1ed34f6..4d4f279 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java @@ -210,6 +210,11 @@ public class VPartitionTupleBufferManager implements IPartitionedTupleBufferMana @Override public void close() { + for (IFrameBufferManager part : partitionArray) { + if (part != null) { + part.close(); + } + } framePool.close(); Arrays.fill(partitionArray, null); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d810df79/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java index 0da9da4..ed7ae8e 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java @@ -57,6 +57,7 @@ public class InMemoryHashJoin { private final TuplePointer storedTuplePointer; private final boolean reverseOutputOrder; //Should we reverse the order of tuples, we are writing in output private final IPredicateEvaluator predEvaluator; + private final boolean isTableCapacityNotZero; private static final Logger LOGGER = Logger.getLogger(InMemoryHashJoin.class.getName()); @@ -97,6 +98,11 @@ public class InMemoryHashJoin { missingTupleBuild = null; } reverseOutputOrder = reverse; + if (tableSize != 0) { + isTableCapacityNotZero = true; + } else { + isTableCapacityNotZero = false; + } LOGGER.fine("InMemoryHashJoin has been created for a table size of " + tableSize + " for Thread ID " + Thread.currentThread().getId() + "."); } @@ -113,17 +119,17 @@ public class InMemoryHashJoin { } } - void join(IFrameTupleAccessor accessorProbe, int tid, IFrameWriter writer) throws HyracksDataException { - this.accessorProbe = accessorProbe; + /** + * Reads the given tuple from the probe side and joins it with tuples from the build side. + * This method assumes that the accessorProbe is already set to the current probe frame. + */ + void join(int tid, IFrameWriter writer) throws HyracksDataException { boolean matchFound = false; - if (tableSize != 0) { + if (isTableCapacityNotZero) { int entry = tpcProbe.partition(accessorProbe, tid, tableSize); - int offset = 0; - do { - table.getTuplePointer(entry, offset++, storedTuplePointer); - if (storedTuplePointer.getFrameIndex() < 0) { - break; - } + int tupleCount = table.getTupleCount(entry); + for (int i = 0; i < tupleCount; i++) { + table.getTuplePointer(entry, i, storedTuplePointer); int bIndex = storedTuplePointer.getFrameIndex(); int tIndex = storedTuplePointer.getTupleIndex(); accessorBuild.reset(buffers.get(bIndex)); @@ -135,7 +141,7 @@ public class InMemoryHashJoin { appendToResult(tid, tIndex, writer); } } - } while (true); + } } if (!matchFound && isLeftOuter) { FrameUtils.appendConcatToWriter(writer, appender, accessorProbe, tid, @@ -148,19 +154,31 @@ public class InMemoryHashJoin { accessorProbe.reset(buffer); int tupleCount0 = accessorProbe.getTupleCount(); for (int i = 0; i < tupleCount0; ++i) { - join(accessorProbe, i, writer); + join(i, writer); } } + public void resetAccessorProbe(IFrameTupleAccessor newAccessorProbe) { + accessorProbe.reset(newAccessorProbe.getBuffer()); + } + public void closeJoin(IFrameWriter writer) throws HyracksDataException { appender.write(writer, true); int nFrames = buffers.size(); + int totalSize = 0; + for (int i = 0; i < nFrames; i++) { + totalSize += buffers.get(i).capacity(); + } buffers.clear(); - ctx.deallocateFrames(nFrames); + ctx.deallocateFrames(totalSize); LOGGER.fine("InMemoryHashJoin has finished using " + nFrames + " frames for Thread ID " + Thread.currentThread().getId() + "."); } + public void closeTable() throws HyracksDataException { + table.close(); + } + private boolean evaluatePredicate(int tIx1, int tIx2) { if (reverseOutputOrder) { //Role Reversal Optimization is triggered return (predEvaluator == null) || predEvaluator.evaluate(accessorBuild, tIx2, accessorProbe, tIx1); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d810df79/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java index b80059b..0770784 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java @@ -20,7 +20,6 @@ package org.apache.hyracks.dataflow.std.join; import java.nio.ByteBuffer; import java.util.BitSet; -import java.util.logging.Logger; import org.apache.commons.io.FileUtils; import org.apache.hyracks.api.comm.IFrame; @@ -361,6 +360,7 @@ public class OptimizedHybridHashJoin { inMemJoiner.join(buffer, writer); return; } + inMemJoiner.resetAccessorProbe(accessorProbe); for (int i = 0; i < tupleCount; ++i) { int pid = probeHpc.partition(accessorProbe, i, numOfPartitions); @@ -380,7 +380,7 @@ public class OptimizedHybridHashJoin { bufferManager.clearPartition(victim); } } else { //pid is Resident - inMemJoiner.join(accessorProbe, i, writer); + inMemJoiner.join(i, writer); } probePSizeInTups[pid]++; } @@ -405,9 +405,13 @@ public class OptimizedHybridHashJoin { } public void closeProbe(IFrameWriter writer) throws HyracksDataException { - //We do NOT join the spilled partitions here, that decision is made at the descriptor level (which join technique to use) + //We do NOT join the spilled partitions here, that decision is made at the descriptor level + //(which join technique to use) inMemJoiner.closeJoin(writer); + inMemJoiner.closeTable(); closeAllSpilledPartitions(SIDE.PROBE); + bufferManager.close(); + inMemJoiner = null; bufferManager = null; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d810df79/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java index 183d7f6..e308dd8 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java @@ -667,6 +667,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD } pReader.close(); joiner.closeJoin(writer); + joiner.closeTable(); } private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d810df79/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java index b42cdb7..9584f26 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java @@ -173,13 +173,18 @@ public class SerializableHashTable implements ISerializableTable { @Override public void close() { int nFrames = contents.size(); - for (int i = 0; i < headers.length; i++) - headers[i] = null; + int hFrames = 0; + for (int i = 0; i < headers.length; i++) { + if (headers[i] != null) { + hFrames++; + headers[i] = null; + } + } contents.clear(); frameCurrentIndex.clear(); tupleCount = 0; currentLargestFrameIndex = 0; - ctx.deallocateFrames(nFrames); + ctx.deallocateFrames((nFrames + hFrames) * frameCapacity * 4); } private void insertNewEntry(IntSerDeBuffer header, int headerOffset, int entryCapacity, TuplePointer pointer)