This is an automated email from the ASF dual-hosted git repository. alsay pushed a commit to branch compressed_iterator in repository https://gitbox.apache.org/repos/asf/datasketches-java.git
commit adff89079a8439ce18ab373a34ab1730a8aa8f07 Author: AlexanderSaydakov <[email protected]> AuthorDate: Sun Jan 26 13:26:35 2025 -0800 use iteration in set ops, wrap compressed sketch and unpack in iterator --- .../org/apache/datasketches/theta/AnotBimpl.java | 34 ++++++++++--- .../datasketches/theta/CompactOperations.java | 2 +- .../apache/datasketches/theta/CompactSketch.java | 18 +++---- .../datasketches/theta/DirectCompactSketch.java | 12 ++--- .../datasketches/theta/IntersectionImpl.java | 59 +++++++++++++--------- .../apache/datasketches/theta/PreambleUtil.java | 3 ++ .../java/org/apache/datasketches/theta/Sketch.java | 13 ++--- .../org/apache/datasketches/theta/UnionImpl.java | 55 ++++---------------- 8 files changed, 92 insertions(+), 104 deletions(-) diff --git a/src/main/java/org/apache/datasketches/theta/AnotBimpl.java b/src/main/java/org/apache/datasketches/theta/AnotBimpl.java index e7b2c99e..6d4c54fd 100644 --- a/src/main/java/org/apache/datasketches/theta/AnotBimpl.java +++ b/src/main/java/org/apache/datasketches/theta/AnotBimpl.java @@ -20,8 +20,11 @@ package org.apache.datasketches.theta; import static org.apache.datasketches.common.Util.exactLog2OfLong; -import static org.apache.datasketches.thetacommon.HashOperations.convertToHashTable; +import static org.apache.datasketches.thetacommon.HashOperations.checkThetaCorruption; +import static org.apache.datasketches.thetacommon.HashOperations.continueCondition; import static org.apache.datasketches.thetacommon.HashOperations.hashSearch; +import static org.apache.datasketches.thetacommon.HashOperations.hashSearchOrInsert; +import static org.apache.datasketches.thetacommon.HashOperations.minLgHashTableSize; import java.util.Arrays; @@ -124,7 +127,7 @@ final class AnotBimpl extends AnotB { if (skB.isEmpty()) { return skA.compact(dstOrdered, dstMem); - } + } ThetaUtil.checkSeedHashes(skB.getSeedHash(), seedHash_); //Both skA & skB are not empty @@ -162,14 +165,12 @@ final class AnotBimpl extends AnotB { final long[] hashArrA, final Sketch skB) { - //Rebuild/get hashtable of skB + // Rebuild or get hashtable of skB final long[] hashTableB; //read only - final long[] thetaCache = skB.getCache(); - final int countB = skB.getRetainedEntries(true); if (skB instanceof CompactSketch) { - hashTableB = convertToHashTable(thetaCache, countB, minThetaLong, ThetaUtil.REBUILD_THRESHOLD); + hashTableB = convertToHashTable(skB, minThetaLong, ThetaUtil.REBUILD_THRESHOLD); } else { - hashTableB = thetaCache; + hashTableB = skB.getCache(); } //build temporary result arrays of skA @@ -191,6 +192,25 @@ final class AnotBimpl extends AnotB { return Arrays.copyOfRange(tmpHashArrA, 0, nonMatches); } + private static long[] convertToHashTable( + final Sketch sketch, + final long thetaLong, + final double rebuildThreshold) { + final int lgArrLongs = minLgHashTableSize(sketch.getRetainedEntries(true), rebuildThreshold); + final int arrLongs = 1 << lgArrLongs; + final long[] hashTable = new long[arrLongs]; + checkThetaCorruption(thetaLong); + HashIterator it = sketch.iterator(); + while (it.next()) { + final long hash = it.get(); + if (continueCondition(thetaLong, hash) ) { + continue; + } + hashSearchOrInsert(hashTable, lgArrLongs, hash); + } + return hashTable; + } + private void reset() { thetaLong_ = Long.MAX_VALUE; empty_ = true; diff --git a/src/main/java/org/apache/datasketches/theta/CompactOperations.java b/src/main/java/org/apache/datasketches/theta/CompactOperations.java index a8066314..2b52f59f 100644 --- a/src/main/java/org/apache/datasketches/theta/CompactOperations.java +++ b/src/main/java/org/apache/datasketches/theta/CompactOperations.java @@ -161,7 +161,7 @@ final class CompactOperations { final long hash = srcMem.getLong(srcPreLongs << 3); final SingleItemSketch sis = new SingleItemSketch(hash, srcSeedHash); if (dstMem != null) { - dstMem.putByteArray(0, sis.toByteArray(),0, 16); + dstMem.putByteArray(0, sis.toByteArray(), 0, 16); return new DirectCompactSketch(dstMem); } else { //heap return sis; diff --git a/src/main/java/org/apache/datasketches/theta/CompactSketch.java b/src/main/java/org/apache/datasketches/theta/CompactSketch.java index 1426368f..688ad274 100644 --- a/src/main/java/org/apache/datasketches/theta/CompactSketch.java +++ b/src/main/java/org/apache/datasketches/theta/CompactSketch.java @@ -32,6 +32,7 @@ import static org.apache.datasketches.theta.PreambleUtil.extractSerVer; import static org.apache.datasketches.theta.PreambleUtil.extractEntryBitsV4; import static org.apache.datasketches.theta.PreambleUtil.extractNumEntriesBytesV4; import static org.apache.datasketches.theta.PreambleUtil.extractThetaLongV4; +import static org.apache.datasketches.theta.PreambleUtil.wholeBytesToHoldBits; import static org.apache.datasketches.theta.SingleItemSketch.otherCheckForSingleItem; import org.apache.datasketches.common.Family; @@ -189,7 +190,8 @@ public abstract class CompactSketch extends Sketch { if (serVer == 4) { // not wrapping the compressed format since currently we cannot take advantage of // decompression during iteration because set operations reach into memory directly - return heapifyV4(srcMem, seed, enforceSeed); + return DirectCompactCompressedSketch.wrapInstance(srcMem, + enforceSeed ? seedHash : (short) extractSeedHash(srcMem)); } else if (serVer == 3) { if (PreambleUtil.isEmptyFlag(srcMem)) { @@ -274,10 +276,6 @@ public abstract class CompactSketch extends Sketch { return Long.numberOfLeadingZeros(ored); } - private static int wholeBytesToHoldBits(final int bits) { - return (bits >>> 3) + ((bits & 7) > 0 ? 1 : 0); - } - private byte[] toByteArrayV4() { final int preambleLongs = isEstimationMode() ? 2 : 1; final int entryBits = 64 - computeMinLeadingZeros(); @@ -286,8 +284,8 @@ public abstract class CompactSketch extends Sketch { // store num_entries as whole bytes since whole-byte blocks will follow (most probably) final int numEntriesBytes = wholeBytesToHoldBits(32 - Integer.numberOfLeadingZeros(getRetainedEntries())); - final int size = preambleLongs * Long.BYTES + numEntriesBytes + wholeBytesToHoldBits(compressedBits); - final byte[] bytes = new byte[size]; + final int sizeBytes = preambleLongs * Long.BYTES + numEntriesBytes + wholeBytesToHoldBits(compressedBits); + final byte[] bytes = new byte[sizeBytes]; final WritableMemory mem = WritableMemory.writableWrap(bytes); int offsetBytes = 0; mem.putByte(offsetBytes++, (byte) preambleLongs); @@ -334,12 +332,10 @@ public abstract class CompactSketch extends Sketch { private static CompactSketch heapifyV4(final Memory srcMem, final long seed, final boolean enforceSeed) { final int preLongs = extractPreLongs(srcMem); - final int flags = extractFlags(srcMem); final int entryBits = extractEntryBitsV4(srcMem); final int numEntriesBytes = extractNumEntriesBytesV4(srcMem); final short seedHash = (short) extractSeedHash(srcMem); - final boolean isEmpty = (flags & EMPTY_FLAG_MASK) > 0; - if (enforceSeed && !isEmpty) { PreambleUtil.checkMemorySeedHash(srcMem, seed); } + if (enforceSeed) { PreambleUtil.checkMemorySeedHash(srcMem, seed); } int offsetBytes = 8; long theta = Long.MAX_VALUE; if (preLongs > 1) { @@ -374,7 +370,7 @@ public abstract class CompactSketch extends Sketch { entries[i] += previous; previous = entries[i]; } - return new HeapCompactSketch(entries, isEmpty, seedHash, numEntries, theta, true); + return new HeapCompactSketch(entries, false, seedHash, numEntries, theta, true); } } diff --git a/src/main/java/org/apache/datasketches/theta/DirectCompactSketch.java b/src/main/java/org/apache/datasketches/theta/DirectCompactSketch.java index 0f69ec3c..1714d216 100644 --- a/src/main/java/org/apache/datasketches/theta/DirectCompactSketch.java +++ b/src/main/java/org/apache/datasketches/theta/DirectCompactSketch.java @@ -86,11 +86,7 @@ class DirectCompactSketch extends CompactSketch { @Override public double getEstimate() { - if (otherCheckForSingleItem(mem_)) { return 1; } - final int preLongs = extractPreLongs(mem_); - final int curCount = (preLongs == 1) ? 0 : extractCurCount(mem_); - final long thetaLong = (preLongs > 2) ? extractThetaLong(mem_) : Long.MAX_VALUE; - return Sketch.estimate(thetaLong, curCount); + return Sketch.estimate(getThetaLong(), getRetainedEntries()); } @Override @@ -142,10 +138,8 @@ class DirectCompactSketch extends CompactSketch { @Override public byte[] toByteArray() { - final int curCount = getRetainedEntries(true); - checkIllegalCurCountAndEmpty(isEmpty(), curCount); - final int preLongs = extractPreLongs(mem_); - final int outBytes = (curCount + preLongs) << 3; + checkIllegalCurCountAndEmpty(isEmpty(), getRetainedEntries()); + final int outBytes = getCurrentBytes(); final byte[] byteArrOut = new byte[outBytes]; mem_.getByteArray(0, byteArrOut, 0, outBytes); return byteArrOut; diff --git a/src/main/java/org/apache/datasketches/theta/IntersectionImpl.java b/src/main/java/org/apache/datasketches/theta/IntersectionImpl.java index fc81d112..3404a0b1 100644 --- a/src/main/java/org/apache/datasketches/theta/IntersectionImpl.java +++ b/src/main/java/org/apache/datasketches/theta/IntersectionImpl.java @@ -288,7 +288,7 @@ class IntersectionImpl extends Intersection { else { //On the heap, allocate a HT hashTable_ = new long[1 << lgArrLongs_]; } - moveDataToTgt(sketchIn.getCache(), curCount_); + moveDataToTgt(sketchIn); } //end of state 5 //state 7 @@ -434,8 +434,6 @@ class IntersectionImpl extends Intersection { private void performIntersect(final Sketch sketchIn) { // curCount and input data are nonzero, match against HT assert curCount_ > 0 && !empty_; - final long[] cacheIn = sketchIn.getCache(); - final int arrLongsIn = cacheIn.length; final long[] hashTable; if (wmem_ != null) { final int htLen = 1 << lgArrLongs_; @@ -448,27 +446,16 @@ class IntersectionImpl extends Intersection { final long[] matchSet = new long[ min(curCount_, sketchIn.getRetainedEntries(true)) ]; int matchSetCount = 0; - if (sketchIn.isOrdered()) { - //ordered compact, which enables early stop - for (int i = 0; i < arrLongsIn; i++ ) { - final long hashIn = cacheIn[i]; - //if (hashIn <= 0L) continue; //<= 0 should not happen - if (hashIn >= thetaLong_) { - break; //early stop assumes that hashes in input sketch are ordered! - } + HashIterator it = sketchIn.iterator(); + while (it.next()) { + final long hashIn = it.get(); + if (hashIn < thetaLong_) { final int foundIdx = hashSearch(hashTable, lgArrLongs_, hashIn); - if (foundIdx == -1) { continue; } - matchSet[matchSetCount++] = hashIn; - } - } - else { - //either unordered compact or hash table - for (int i = 0; i < arrLongsIn; i++ ) { - final long hashIn = cacheIn[i]; - if (hashIn <= 0L || hashIn >= thetaLong_) { continue; } - final int foundIdx = hashSearch(hashTable, lgArrLongs_, hashIn); - if (foundIdx == -1) { continue; } - matchSet[matchSetCount++] = hashIn; + if (foundIdx != -1) { + matchSet[matchSetCount++] = hashIn; + } + } else { + if (sketchIn.isOrdered()) { break; } // early stop } } //reduce effective array size to minimum @@ -515,6 +502,32 @@ class IntersectionImpl extends Intersection { assert tmpCnt == count : "Intersection Count Check: got: " + tmpCnt + ", expected: " + count; } + private void moveDataToTgt(final Sketch sketch) { + int count = sketch.getRetainedEntries(); + int tmpCnt = 0; + if (wmem_ != null) { //Off Heap puts directly into mem + final int preBytes = CONST_PREAMBLE_LONGS << 3; + final int lgArrLongs = lgArrLongs_; + final long thetaLong = thetaLong_; + HashIterator it = sketch.iterator(); + while (it.next()) { + final long hash = it.get(); + if (continueCondition(thetaLong, hash)) { continue; } + hashInsertOnlyMemory(wmem_, lgArrLongs, hash, preBytes); + tmpCnt++; + } + } else { //On Heap. Assumes HT exists and is large enough + HashIterator it = sketch.iterator(); + while (it.next()) { + final long hash = it.get(); + if (continueCondition(thetaLong_, hash)) { continue; } + hashInsertOnly(hashTable_, lgArrLongs_, hash); + tmpCnt++; + } + } + assert tmpCnt == count : "Intersection Count Check: got: " + tmpCnt + ", expected: " + count; + } + private void hardReset() { resetCommon(); if (wmem_ != null) { diff --git a/src/main/java/org/apache/datasketches/theta/PreambleUtil.java b/src/main/java/org/apache/datasketches/theta/PreambleUtil.java index e1d9262e..ec0bc126 100644 --- a/src/main/java/org/apache/datasketches/theta/PreambleUtil.java +++ b/src/main/java/org/apache/datasketches/theta/PreambleUtil.java @@ -524,4 +524,7 @@ final class PreambleUtil { + ", Required: " + required); } + static int wholeBytesToHoldBits(final int bits) { + return (bits >>> 3) + ((bits & 7) > 0 ? 1 : 0); + } } diff --git a/src/main/java/org/apache/datasketches/theta/Sketch.java b/src/main/java/org/apache/datasketches/theta/Sketch.java index 89618bc2..6583e2db 100644 --- a/src/main/java/org/apache/datasketches/theta/Sketch.java +++ b/src/main/java/org/apache/datasketches/theta/Sketch.java @@ -451,9 +451,8 @@ public abstract class Sketch implements MemoryStatus { final boolean hexMode) { final StringBuilder sb = new StringBuilder(); - final long[] cache = getCache(); int nomLongs = 0; - int arrLongs = cache.length; + int arrLongs = 0; float p = 0; int rf = 0; final boolean updateSketch = this instanceof UpdateSketch; @@ -473,12 +472,10 @@ public abstract class Sketch implements MemoryStatus { final int w = width > 0 ? width : 8; // default is 8 wide if (curCount > 0) { sb.append("### SKETCH DATA DETAIL"); - for (int i = 0, j = 0; i < arrLongs; i++ ) { - final long h; - h = cache[i]; - if (h <= 0 || h >= thetaLong) { - continue; - } + HashIterator it = iterator(); + int j = 0; + while (it.next()) { + final long h = it.get(); if (j % w == 0) { sb.append(LS).append(String.format(" %6d", j + 1)); } diff --git a/src/main/java/org/apache/datasketches/theta/UnionImpl.java b/src/main/java/org/apache/datasketches/theta/UnionImpl.java index bac05de7..4fee6a9c 100644 --- a/src/main/java/org/apache/datasketches/theta/UnionImpl.java +++ b/src/main/java/org/apache/datasketches/theta/UnionImpl.java @@ -320,46 +320,17 @@ final class UnionImpl extends Union { } //sketchIn is valid and not empty ThetaUtil.checkSeedHashes(expectedSeedHash_, sketchIn.getSeedHash()); - if (sketchIn instanceof SingleItemSketch) { - gadget_.hashUpdate(sketchIn.getCache()[0]); - return; - } Sketch.checkSketchAndMemoryFlags(sketchIn); unionThetaLong_ = min(min(unionThetaLong_, sketchIn.getThetaLong()), gadget_.getThetaLong()); //Theta rule unionEmpty_ = false; - final int curCountIn = sketchIn.getRetainedEntries(true); - if (curCountIn > 0) { - if (sketchIn.isOrdered() && (sketchIn instanceof CompactSketch)) { //Use early stop - //Ordered, thus compact - if (sketchIn.hasMemory()) { - final Memory skMem = sketchIn.getMemory(); - final int preambleLongs = skMem.getByte(PREAMBLE_LONGS_BYTE) & 0X3F; - for (int i = 0; i < curCountIn; i++ ) { - final int offsetBytes = preambleLongs + i << 3; - final long hashIn = skMem.getLong(offsetBytes); - if (hashIn >= unionThetaLong_) { break; } // "early stop" - gadget_.hashUpdate(hashIn); //backdoor update, hash function is bypassed - } - } - else { //sketchIn is on the Java Heap or has array - final long[] cacheIn = sketchIn.getCache(); //not a copy! - for (int i = 0; i < curCountIn; i++ ) { - final long hashIn = cacheIn[i]; - if (hashIn >= unionThetaLong_) { break; } // "early stop" - gadget_.hashUpdate(hashIn); //backdoor update, hash function is bypassed - } - } - } //End ordered, compact - else { //either not-ordered compact or Hash Table form. A HT may have dirty values. - final long[] cacheIn = sketchIn.getCache(); //if off-heap this will be a copy - final int arrLongs = cacheIn.length; - for (int i = 0, c = 0; i < arrLongs && c < curCountIn; i++ ) { - final long hashIn = cacheIn[i]; - if (hashIn <= 0L || hashIn >= unionThetaLong_) { continue; } //rejects dirty values - gadget_.hashUpdate(hashIn); //backdoor update, hash function is bypassed - c++; //ensures against invalid state inside the incoming sketch - } + HashIterator it = sketchIn.iterator(); + while (it.next()) { + final long hash = it.get(); + if (hash < unionThetaLong_ && hash < gadget_.getThetaLong()) { + gadget_.hashUpdate(hash); // backdoor update, hash function is bypassed + } else { + if (sketchIn.isOrdered()) { break; } } } unionThetaLong_ = min(unionThetaLong_, gadget_.getThetaLong()); //Theta rule with gadget @@ -379,11 +350,8 @@ final class UnionImpl extends Union { final int fam = extractFamilyID(skMem); if (serVer == 4) { // compressed ordered compact - // performance can be improved by decompression while performing the union - // potentially only partial decompression might be needed ThetaUtil.checkSeedHashes(expectedSeedHash_, (short) extractSeedHash(skMem)); - final CompactSketch csk = CompactSketch.wrap(skMem); - union(csk); + union(CompactSketch.wrap(skMem)); return; } if (serVer == 3) { //The OpenSource sketches (Aug 4, 2015) starts with serVer = 3 @@ -396,16 +364,13 @@ final class UnionImpl extends Union { } if (serVer == 2) { //older Sketch, which is compact and ordered ThetaUtil.checkSeedHashes(expectedSeedHash_, (short)extractSeedHash(skMem)); - final CompactSketch csk = ForwardCompatibility.heapify2to3(skMem, expectedSeedHash_); - union(csk); + union(ForwardCompatibility.heapify2to3(skMem, expectedSeedHash_)); return; } if (serVer == 1) { //much older Sketch, which is compact and ordered, no seedHash - final CompactSketch csk = ForwardCompatibility.heapify1to3(skMem, expectedSeedHash_); - union(csk); + union(ForwardCompatibility.heapify1to3(skMem, expectedSeedHash_)); return; } - throw new SketchesArgumentException("SerVer is unknown: " + serVer); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
