This is an automated email from the ASF dual-hosted git repository.
luochen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new a806f23 [NO ISSUE][STO] Add concurrency control for filters
a806f23 is described below
commit a806f2316b322b572237df69c7148a1cd2d770ad
Author: luochen <[email protected]>
AuthorDate: Tue Mar 17 14:57:26 2020 -0700
[NO ISSUE][STO] Add concurrency control for filters
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Memory components' filters are mutable and will be updated upon writes.
However, previously there is no any concurrency control for concurrent
updates of filters. This patch adds a read write lock to address this
problem.
Change-Id: I77eb348b84447dc552841f23b3a922a4316fa305
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/5363
Contrib: Jenkins <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Luo Chen <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
---
.../am/lsm/common/impls/LSMComponentFilter.java | 150 ++++++++++++---------
1 file changed, 89 insertions(+), 61 deletions(-)
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilter.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilter.java
index 6ccb114..12d92a8 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilter.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilter.java
@@ -19,6 +19,8 @@
package org.apache.hyracks.storage.am.lsm.common.impls;
import java.nio.ByteBuffer;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -31,6 +33,7 @@ import org.apache.hyracks.storage.common.MultiComparator;
public class LSMComponentFilter implements ILSMComponentFilter {
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final IBinaryComparatorFactory[] filterCmpFactories;
private final ITreeIndexTupleWriter tupleWriter;
@@ -55,98 +58,123 @@ public class LSMComponentFilter implements
ILSMComponentFilter {
@Override
public void reset() {
- minTuple = null;
- maxTuple = null;
- minTupleBytes = null;
- maxTupleBytes = null;
- minTupleBuf = null;
- maxTupleBuf = null;
+ lock.writeLock().lock();
+ try {
+ minTuple = null;
+ maxTuple = null;
+ minTupleBytes = null;
+ maxTupleBytes = null;
+ minTupleBuf = null;
+ maxTupleBuf = null;
+ } finally {
+ lock.writeLock().unlock();
+ }
}
@Override
public void update(ITupleReference tuple, MultiComparator cmp,
IExtendedModificationOperationCallback opCallback)
throws HyracksDataException {
- boolean logged = false;
- if (minTuple == null) {
- int numBytes = tupleWriter.bytesRequired(tuple);
- minTupleBytes = new byte[numBytes];
- opCallback.after(tuple);
- logged = true;
- tupleWriter.writeTuple(tuple, minTupleBytes, 0);
- minTupleBuf = ByteBuffer.wrap(minTupleBytes);
- minTuple = tupleWriter.createTupleReference();
- ((ITreeIndexTupleReference)
minTuple).resetByTupleOffset(minTupleBuf.array(), 0);
- } else {
- int c = cmp.compare(tuple, minTuple);
- if (c < 0) {
+ lock.writeLock().lock();
+ try {
+ boolean logged = false;
+ if (minTuple == null) {
+ int numBytes = tupleWriter.bytesRequired(tuple);
+ minTupleBytes = new byte[numBytes];
opCallback.after(tuple);
logged = true;
- int numBytes = tupleWriter.bytesRequired(tuple);
- if (minTupleBytes.length < numBytes) {
- minTupleBytes = new byte[numBytes];
- tupleWriter.writeTuple(tuple, minTupleBytes, 0);
- minTupleBuf = ByteBuffer.wrap(minTupleBytes);
- } else {
- tupleWriter.writeTuple(tuple, minTupleBytes, 0);
- }
+ tupleWriter.writeTuple(tuple, minTupleBytes, 0);
+ minTupleBuf = ByteBuffer.wrap(minTupleBytes);
+ minTuple = tupleWriter.createTupleReference();
((ITreeIndexTupleReference)
minTuple).resetByTupleOffset(minTupleBuf.array(), 0);
- }
- }
- if (maxTuple == null) {
- int numBytes = tupleWriter.bytesRequired(tuple);
- maxTupleBytes = new byte[numBytes];
- if (!logged) {
- opCallback.after(tuple);
- }
- tupleWriter.writeTuple(tuple, maxTupleBytes, 0);
- maxTupleBuf = ByteBuffer.wrap(maxTupleBytes);
- maxTuple = tupleWriter.createTupleReference();
- ((ITreeIndexTupleReference)
maxTuple).resetByTupleOffset(maxTupleBuf.array(), 0);
- } else {
- int c = cmp.compare(tuple, maxTuple);
- if (c > 0) {
- if (!logged) {
+ } else {
+ int c = cmp.compare(tuple, minTuple);
+ if (c < 0) {
opCallback.after(tuple);
+ logged = true;
+ int numBytes = tupleWriter.bytesRequired(tuple);
+ if (minTupleBytes.length < numBytes) {
+ minTupleBytes = new byte[numBytes];
+ tupleWriter.writeTuple(tuple, minTupleBytes, 0);
+ minTupleBuf = ByteBuffer.wrap(minTupleBytes);
+ } else {
+ tupleWriter.writeTuple(tuple, minTupleBytes, 0);
+ }
+ ((ITreeIndexTupleReference)
minTuple).resetByTupleOffset(minTupleBuf.array(), 0);
}
+ }
+ if (maxTuple == null) {
int numBytes = tupleWriter.bytesRequired(tuple);
- if (maxTupleBytes.length < numBytes) {
- maxTupleBytes = new byte[numBytes];
- tupleWriter.writeTuple(tuple, maxTupleBytes, 0);
- maxTupleBuf = ByteBuffer.wrap(maxTupleBytes);
- } else {
- tupleWriter.writeTuple(tuple, maxTupleBytes, 0);
+ maxTupleBytes = new byte[numBytes];
+ if (!logged) {
+ opCallback.after(tuple);
}
+ tupleWriter.writeTuple(tuple, maxTupleBytes, 0);
+ maxTupleBuf = ByteBuffer.wrap(maxTupleBytes);
+ maxTuple = tupleWriter.createTupleReference();
((ITreeIndexTupleReference)
maxTuple).resetByTupleOffset(maxTupleBuf.array(), 0);
+ } else {
+ int c = cmp.compare(tuple, maxTuple);
+ if (c > 0) {
+ if (!logged) {
+ opCallback.after(tuple);
+ }
+ int numBytes = tupleWriter.bytesRequired(tuple);
+ if (maxTupleBytes.length < numBytes) {
+ maxTupleBytes = new byte[numBytes];
+ tupleWriter.writeTuple(tuple, maxTupleBytes, 0);
+ maxTupleBuf = ByteBuffer.wrap(maxTupleBytes);
+ } else {
+ tupleWriter.writeTuple(tuple, maxTupleBytes, 0);
+ }
+ ((ITreeIndexTupleReference)
maxTuple).resetByTupleOffset(maxTupleBuf.array(), 0);
+ }
}
+ } finally {
+ lock.writeLock().unlock();
}
}
@Override
public ITupleReference getMinTuple() {
- return minTuple;
+ lock.readLock().lock();
+ try {
+ return minTuple;
+ } finally {
+ lock.readLock().unlock();
+ }
}
@Override
public ITupleReference getMaxTuple() {
- return maxTuple;
+ lock.readLock().lock();
+ try {
+ return maxTuple;
+ } finally {
+ lock.readLock().unlock();
+ }
}
@Override
public boolean satisfy(ITupleReference minTuple, ITupleReference maxTuple,
MultiComparator filterCmp)
throws HyracksDataException {
- if (maxTuple != null && this.minTuple != null) {
- int c = filterCmp.compare(maxTuple, this.minTuple);
- if (c < 0) {
- return false;
+ lock.readLock().lock();
+ try {
+ if (maxTuple != null && this.minTuple != null) {
+ int c = filterCmp.compare(maxTuple, this.minTuple);
+ if (c < 0) {
+ return false;
+ }
}
- }
- if (minTuple != null && this.maxTuple != null) {
- int c = filterCmp.compare(minTuple, this.maxTuple);
- if (c > 0) {
- return false;
+ if (minTuple != null && this.maxTuple != null) {
+ int c = filterCmp.compare(minTuple, this.maxTuple);
+ if (c > 0) {
+ return false;
+ }
}
+ return true;
+ } finally {
+ lock.readLock().unlock();
}
- return true;
}
}