janniklinde commented on code in PR #2398: URL: https://github.com/apache/systemds/pull/2398#discussion_r2697728126
########## src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDCLZW.java: ########## Review Comment: In general, all not properly implemented methods should throw a NotImplementedException. Also, you should implement some of the operations that can be done on the compressed representation (e.g., scalar ops, unary, ...). Further, `getExactSizeOnDisk()` should be implemented ########## src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDCLZW.java: ########## @@ -0,0 +1,642 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.compress.colgroup; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ExecutorService; + +import jdk.incubator.vector.DoubleVector; +import jdk.incubator.vector.VectorSpecies; +import org.apache.arrow.vector.complex.writer.BitWriter; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.compress.CompressedMatrixBlock; +import org.apache.sysds.runtime.compress.DMLCompressionException; +import org.apache.sysds.runtime.compress.colgroup.ColGroupUtils.P; +import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary; +import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory; +import org.apache.sysds.runtime.compress.colgroup.dictionary.IDictionary; +import org.apache.sysds.runtime.compress.colgroup.dictionary.IdentityDictionary; +import org.apache.sysds.runtime.compress.colgroup.dictionary.MatrixBlockDictionary; +import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory; +import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex; +import org.apache.sysds.runtime.compress.colgroup.indexes.RangeIndex; +import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData; +import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory; +import org.apache.sysds.runtime.compress.colgroup.offset.AOffsetIterator; +import org.apache.sysds.runtime.compress.colgroup.offset.OffsetFactory; +import org.apache.sysds.runtime.compress.colgroup.scheme.DDCScheme; +import org.apache.sysds.runtime.compress.colgroup.scheme.ICLAScheme; +import org.apache.sysds.runtime.compress.cost.ComputationCostEstimator; +import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup; +import org.apache.sysds.runtime.compress.estim.EstimationFactors; +import org.apache.sysds.runtime.compress.estim.encoding.EncodingFactory; +import org.apache.sysds.runtime.compress.estim.encoding.IEncode; +import org.apache.sysds.runtime.data.DenseBlock; +import org.apache.sysds.runtime.data.SparseBlock; +import org.apache.sysds.runtime.data.SparseBlockMCSR; +import org.apache.sysds.runtime.data.SparseRow; +import org.apache.sysds.runtime.functionobjects.Builtin; +import org.apache.sysds.runtime.functionobjects.Minus; +import org.apache.sysds.runtime.functionobjects.Plus; +import org.apache.sysds.runtime.matrix.data.LibMatrixMult; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.operators.BinaryOperator; +import org.apache.sysds.runtime.matrix.operators.RightScalarOperator; +import org.apache.sysds.runtime.matrix.operators.ScalarOperator; +import org.apache.sysds.runtime.matrix.operators.UnaryOperator; +import org.jboss.netty.handler.codec.compression.CompressionException; +import shaded.parquet.it.unimi.dsi.fastutil.ints.IntArrayList; +import shaded.parquet.it.unimi.dsi.fastutil.longs.Long2IntLinkedOpenHashMap; + + +import java.util.Map; +import java.util.HashMap; +import java.util.Stack; + +/** + * Class to encapsulate information about a column group that is encoded with dense dictionary encoding (DDC) whose + * mapping vector is additionally lzw compressed. + * Idea: + * - DDCLZW stores the mapping vector exclusively in compressed form. + * - No persistent MapToData cache is maintained. + * - Sequential operations decode on-the-fly, while operations requiring random access explicitly materialize and fall back to DDC. + */ +public class ColGroupDDCLZW extends APreAgg implements IMapToDataGroup { + private static final long serialVersionUID = -5769772089913918987L; + + private final int[] _dataLZW; // LZW compressed representation of the mapping + private final int _nRows; // Number of rows in the mapping vector + private final int _nUnique; // Number of unique values in the mapping vector + + // Builds a packed 64-bit key for (prefixCode(w), nextSymbol(k)) pairs used in the LZW dictionary. (TODO) + private static long packKey(int prefixCode, int nextSymbol) { + return (((long) prefixCode) << 32) | (nextSymbol & 0xffffffffL); + } + + // Compresses a mapping (AMapToData) into an LZW-compressed byte/integer/? array. (TODO) + private static int[] compress(AMapToData data) { + if (data == null) + throw new IllegalArgumentException("Invalid input: data is null"); + + final int nRows = data.size(); + if (nRows <= 0) { + throw new IllegalArgumentException("Invalid input: data has no rows"); + } + + final int nUnique = data.getUnique(); + if (nUnique <= 0) { + throw new IllegalArgumentException("Invalid input: data has no unique values"); + } + + // Fast-path: single symbol + if (nRows == 1) + return new int[]{data.getIndex(0)}; + + + // LZW dictionary. Maps (prefixCode, nextSymbol) -> newCode (to a new code). + // Using fastutil keeps lookups fast. (TODO improve time/space complexity) + final Long2IntLinkedOpenHashMap dict = new Long2IntLinkedOpenHashMap(1 << 16); + dict.defaultReturnValue(-1); + + // Output buffer (heuristic capacity; avoids frequent reallocs) + final IntArrayList out = new IntArrayList(Math.max(16, nRows / 2)); + + // Codes {0,...,nUnique - 1} are reserved for the original symbols. + int nextCode = nUnique; + + // Initialize w with the first input symbol. + // AMapToData stores dictionary indices, not actual data values. + // Since indices reference positions in an IDictionary, they are always in the valid index range 0 … nUnique−1; + int w = data.getIndex(0); + + // Process the remaining input symbols. + // Example: _data = [2,0,2,3,0,2,1,0,2]. + for (int i = 1; i < nRows; i++) { + final int k = data.getIndex(i); // next input symbol + + if (k < 0 || k >= nUnique) + throw new IllegalArgumentException("Symbol out of range: " + k + " (nUnique=" + nUnique + ")"); + + final long key = packKey(w, k); // encode (w,k) into long key + + int wk = dict.get(key); // look if wk exists in dict + if (wk != -1) { + w = wk; // wk exists in dict so replace w by wk and continue. + } else { + // wk does not exist in dict. output current phrase, add new phrase, restart at k + out.add(w); + dict.put(key, nextCode++); + w = k; // Start new phrase with k + } + } + + out.add(w); + return out.toIntArray(); + } + + // Unpack upper 32 bits (w) of (w,k) key pair. + private static int unpackfirst(long key) { + return (int) (key >>> 32); + } + + // Unpack lower 32 bits (k) of (w,k) key pair. + private static int unpacksecond(long key) { + return (int) (key); + } + + // Append symbol to end of int-array. + private static int[] packint(int[] arr, int last) { + int[] result = Arrays.copyOf(arr, arr.length + 1); + result[arr.length] = last; + return result; + } + + // Reconstruct phrase to lzw-code. + private static int[] unpack(int code, int nUnique, Map<Integer, Long> dict) { + // Base symbol (implicit alphabet) + if (code < nUnique) + return new int[]{code}; + + Stack<Integer> stack = new Stack<>(); + int c = code; + + while (c >= nUnique) { + Long key = dict.get(c); + if (key == null) + throw new IllegalStateException("Missing dictionary entry for code: " + c); + + int symbol = unpacksecond(key); + stack.push(symbol); + c = unpackfirst(key); + } + + // Basissymbol + stack.push(c); + int[] outarray = new int[stack.size()]; + int i = 0; + // korrekt ins Output schreiben + while (!stack.isEmpty()) { + outarray[i++] = stack.pop(); + } + return outarray; + } + + // Decompresses an LZW-compressed vector into its pre-compressed AMapToData form. + // TODO: Compatibility with compress() and used data structures. Improve time/space complexity. + private static AMapToData decompress(int[] codes, int nUnique, int nRows, int index) { + // Validate input arguments. + if (codes == null) + throw new IllegalArgumentException("codes is null"); + if (codes.length == 0) + throw new IllegalArgumentException("codes is empty"); + if (nUnique <= 0) + throw new IllegalArgumentException("Invalid alphabet size: " + nUnique); + if (nRows <= 0) { + throw new IllegalArgumentException("Invalid nRows: " + nRows); + } + if (index > nRows){ + throw new IllegalArgumentException("Index is larger than Data Length: " + index); + } + + // Maps: code -> packKey(prefixCode, lastSymbolOfPhrase). + // Base symbols (0..nUnique-1) are implicit and not stored here. + final Map<Integer, Long> dict = new HashMap<>(); + + // Output mapping that will be reconstructed. + AMapToData out = MapToFactory.create(index, nUnique); + int outPos = 0; // Current write position in the output mapping. + + // Decode the first code. The first code always expands to a valid phrase without needing + // any dictionary entries. + int old = codes[0]; + int[] oldPhrase = unpack(old, nUnique, dict); + for (int v : oldPhrase){ + if (outPos == index) break; + out.set(outPos++, v); + } + + // Next free dictionary code. Codes 0..nUnique-1 are reserved for base symbols. + int nextCode = nUnique; + + // Process remaining codes. + for (int i = 1; i < codes.length; i++) { + int key = codes[i]; + + int[] next; + if (key < nUnique || dict.containsKey(key)) { + // Normal case: The code is either a base symbol or already present in the dictionary. + next = unpack(key, nUnique, dict); + } else { + // KwKwK special case: The current code refers to a phrase that is being defined right now. + // next = oldPhrase + first(oldPhrase). + int first = oldPhrase[0]; + next = packint(oldPhrase, first); + } + + // Append the reconstructed phrase to the output mapping. + for (int v : next) { + if (outPos == index) break; + out.set(outPos++, v); + } + + // Add new phrase to dictionary: nextCode -> (old, firstSymbol(next)). + int first = next[0]; + dict.put(nextCode++, packKey(old, first)); + + // Advance. + old = key; + oldPhrase = next; + } + + // Safety check: decoder must produce exactly nRows symbols. + if (outPos != index) + throw new IllegalStateException("Decompression length mismatch: got " + outPos + " expected " + index); + + // Return the reconstructed mapping. + return out; + } + + // Build Constructor: Used when creating a new DDCLZW instance during compression/build time. (TODO) + private ColGroupDDCLZW(IColIndex colIndexes, IDictionary dict, AMapToData data, int[] cachedCounts) { + super(colIndexes, dict, cachedCounts); + + // Derive metadadata + _nRows = data.size(); + _nUnique = dict.getNumberOfValues(colIndexes.size()); + + // Compress mapping to LZW + _dataLZW = compress(data); + + if (CompressedMatrixBlock.debug) { + if (getNumValues() == 0) + throw new DMLCompressionException("Invalid construction with empty dictionary"); + if (_nRows == 0) + throw new DMLCompressionException("Invalid length of the data. is zero"); + if (data.getUnique() != dict.getNumberOfValues(colIndexes.size())) + throw new DMLCompressionException("Invalid map to dict Map has:" + data.getUnique() + " while dict has " + + dict.getNumberOfValues(colIndexes.size())); + int[] c = getCounts(); + if (c.length != dict.getNumberOfValues(colIndexes.size())) + throw new DMLCompressionException("Invalid DDC Construction"); + data.verify(); + } + } + + // Read Constructor: Used when creating this group from a serialized form (e.g., reading a compressed matrix from disk/memory stream). (TODO) + private ColGroupDDCLZW(IColIndex colIndexes, IDictionary dict, int[] dataLZW, int nRows, int nUnique, int[] cachedCounts) { + super(colIndexes, dict, cachedCounts); + + _dataLZW = dataLZW; + _nRows = nRows; + _nUnique = nUnique; + + if (CompressedMatrixBlock.debug) { + if (getNumValues() == 0) + throw new DMLCompressionException("Invalid construction with empty dictionary"); + if (_nRows <= 0) + throw new DMLCompressionException("Invalid length of the data. is zero"); + if (_nUnique <= dict.getNumberOfValues(colIndexes.size())) + throw new DMLCompressionException("Invalid map to dict Map has:" + _nUnique + " while dict has " + + dict.getNumberOfValues(colIndexes.size())); + int[] c = getCounts(); + if (c.length != dict.getNumberOfValues(colIndexes.size())) + throw new DMLCompressionException("Invalid DDC Construction"); + + // Optional: validate that decoding works (expensive) + // AMapToData decoded = decode(_dataLZW, _nRows, _nUnique); + // decoded.verify(); + } + } + + // Factory method for creating a column group. (AColGroup g = ColGroupDDCLZW.create(...);) + public static AColGroup create(IColIndex colIndexes, IDictionary dict, AMapToData data, int[] cachedCounts) { + if (dict == null) + return new ColGroupEmpty(colIndexes); + else if (data.getUnique() == 1) + return ColGroupConst.create(colIndexes, dict); + else + return new ColGroupDDCLZW(colIndexes, dict, data, cachedCounts); + } + + /* + * TODO: Operations with complex access patterns shall be uncompressed to ddc format. + * ... return ColGroupDDC.create(...,decompress(_dataLZW),...). We need to decide which methods are + * suitable for sequential and which arent. those who arent then we shall materialize and fall back to ddc + * */ + + public AColGroup convertToDDC() { + final AMapToData map = decompress(_dataLZW, _nUnique, _nRows, _nRows); + final int[] counts = getCounts(); // may be null depending on your group + return ColGroupDDC.create(_colIndexes, _dict, map, counts); + } + + public AColGroup convertToDDC(int index) { + final AMapToData map = decompress(_dataLZW, _nUnique, _nRows, index); + final int[] counts = getCounts(); // may be null depending on your group + return ColGroupDDC.create(_colIndexes, _dict, map, counts); + } + + // Deserialize ColGroupDDCLZW object in binary stream. + public static ColGroupDDCLZW read(DataInput in) throws IOException { + final IColIndex colIndexes = ColIndexFactory.read(in); + final IDictionary dict = DictionaryFactory.read(in); + + // Metadata for lzw mapping. + final int nRows = in.readInt(); + final int nUnique = in.readInt(); + + // Read compressed mapping array. + final int len = in.readInt(); + if (len < 0) + throw new IOException("Invalid LZW data length: " + len); + + final int[] dataLZW = new int[len]; + for (int i = 0; i < len; i++) + dataLZW[i] = in.readInt(); + + // cachedCounts currently not serialized (mirror ColGroupDDC.read which passes null) + return new ColGroupDDCLZW(colIndexes, dict, dataLZW, nRows, nUnique, null); + } + + // Serialize a ColGroupDDC-object into binary stream. + @Override + public void write(DataOutput out) throws IOException { + _colIndexes.write(out); + _dict.write(out); + out.writeInt(_nRows); + out.writeInt(_nUnique); + out.writeInt(_dataLZW.length); + for (int i : _dataLZW) out.writeInt(i); + } + + @Override + public double getIdx(int r, int colIdx) { + // TODO: soll schnell sein + final AMapToData map = decompress(_dataLZW, _nUnique, _nRows, r); + // TODO: ColumnIndex + return map.getIndex(r); + } Review Comment: This is incorrect, this method must return the value at the specified index and not the index itself. Also, the current implementation leads to out of bounds exceptions. While getIdx() is not an efficient operation on a compressed map to data, it should still be possible without having to fully materialize AMapToData (you can just iterate through the current values until you reached the desired index) ########## src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDCLZW.java: ########## @@ -0,0 +1,642 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.compress.colgroup; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ExecutorService; + +import jdk.incubator.vector.DoubleVector; +import jdk.incubator.vector.VectorSpecies; +import org.apache.arrow.vector.complex.writer.BitWriter; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.compress.CompressedMatrixBlock; +import org.apache.sysds.runtime.compress.DMLCompressionException; +import org.apache.sysds.runtime.compress.colgroup.ColGroupUtils.P; +import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary; +import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory; +import org.apache.sysds.runtime.compress.colgroup.dictionary.IDictionary; +import org.apache.sysds.runtime.compress.colgroup.dictionary.IdentityDictionary; +import org.apache.sysds.runtime.compress.colgroup.dictionary.MatrixBlockDictionary; +import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory; +import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex; +import org.apache.sysds.runtime.compress.colgroup.indexes.RangeIndex; +import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData; +import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory; +import org.apache.sysds.runtime.compress.colgroup.offset.AOffsetIterator; +import org.apache.sysds.runtime.compress.colgroup.offset.OffsetFactory; +import org.apache.sysds.runtime.compress.colgroup.scheme.DDCScheme; +import org.apache.sysds.runtime.compress.colgroup.scheme.ICLAScheme; +import org.apache.sysds.runtime.compress.cost.ComputationCostEstimator; +import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup; +import org.apache.sysds.runtime.compress.estim.EstimationFactors; +import org.apache.sysds.runtime.compress.estim.encoding.EncodingFactory; +import org.apache.sysds.runtime.compress.estim.encoding.IEncode; +import org.apache.sysds.runtime.data.DenseBlock; +import org.apache.sysds.runtime.data.SparseBlock; +import org.apache.sysds.runtime.data.SparseBlockMCSR; +import org.apache.sysds.runtime.data.SparseRow; +import org.apache.sysds.runtime.functionobjects.Builtin; +import org.apache.sysds.runtime.functionobjects.Minus; +import org.apache.sysds.runtime.functionobjects.Plus; +import org.apache.sysds.runtime.matrix.data.LibMatrixMult; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.operators.BinaryOperator; +import org.apache.sysds.runtime.matrix.operators.RightScalarOperator; +import org.apache.sysds.runtime.matrix.operators.ScalarOperator; +import org.apache.sysds.runtime.matrix.operators.UnaryOperator; +import org.jboss.netty.handler.codec.compression.CompressionException; +import shaded.parquet.it.unimi.dsi.fastutil.ints.IntArrayList; +import shaded.parquet.it.unimi.dsi.fastutil.longs.Long2IntLinkedOpenHashMap; + + +import java.util.Map; +import java.util.HashMap; +import java.util.Stack; + +/** + * Class to encapsulate information about a column group that is encoded with dense dictionary encoding (DDC) whose + * mapping vector is additionally lzw compressed. + * Idea: + * - DDCLZW stores the mapping vector exclusively in compressed form. + * - No persistent MapToData cache is maintained. + * - Sequential operations decode on-the-fly, while operations requiring random access explicitly materialize and fall back to DDC. + */ +public class ColGroupDDCLZW extends APreAgg implements IMapToDataGroup { + private static final long serialVersionUID = -5769772089913918987L; + + private final int[] _dataLZW; // LZW compressed representation of the mapping + private final int _nRows; // Number of rows in the mapping vector + private final int _nUnique; // Number of unique values in the mapping vector + + // Builds a packed 64-bit key for (prefixCode(w), nextSymbol(k)) pairs used in the LZW dictionary. (TODO) + private static long packKey(int prefixCode, int nextSymbol) { + return (((long) prefixCode) << 32) | (nextSymbol & 0xffffffffL); + } + + // Compresses a mapping (AMapToData) into an LZW-compressed byte/integer/? array. (TODO) + private static int[] compress(AMapToData data) { + if (data == null) + throw new IllegalArgumentException("Invalid input: data is null"); + + final int nRows = data.size(); + if (nRows <= 0) { + throw new IllegalArgumentException("Invalid input: data has no rows"); + } + + final int nUnique = data.getUnique(); + if (nUnique <= 0) { + throw new IllegalArgumentException("Invalid input: data has no unique values"); + } + + // Fast-path: single symbol + if (nRows == 1) + return new int[]{data.getIndex(0)}; + + + // LZW dictionary. Maps (prefixCode, nextSymbol) -> newCode (to a new code). + // Using fastutil keeps lookups fast. (TODO improve time/space complexity) + final Long2IntLinkedOpenHashMap dict = new Long2IntLinkedOpenHashMap(1 << 16); + dict.defaultReturnValue(-1); + + // Output buffer (heuristic capacity; avoids frequent reallocs) + final IntArrayList out = new IntArrayList(Math.max(16, nRows / 2)); + + // Codes {0,...,nUnique - 1} are reserved for the original symbols. + int nextCode = nUnique; + + // Initialize w with the first input symbol. + // AMapToData stores dictionary indices, not actual data values. + // Since indices reference positions in an IDictionary, they are always in the valid index range 0 … nUnique−1; + int w = data.getIndex(0); + + // Process the remaining input symbols. + // Example: _data = [2,0,2,3,0,2,1,0,2]. + for (int i = 1; i < nRows; i++) { + final int k = data.getIndex(i); // next input symbol + + if (k < 0 || k >= nUnique) + throw new IllegalArgumentException("Symbol out of range: " + k + " (nUnique=" + nUnique + ")"); + + final long key = packKey(w, k); // encode (w,k) into long key + + int wk = dict.get(key); // look if wk exists in dict + if (wk != -1) { + w = wk; // wk exists in dict so replace w by wk and continue. + } else { + // wk does not exist in dict. output current phrase, add new phrase, restart at k + out.add(w); + dict.put(key, nextCode++); + w = k; // Start new phrase with k + } + } + + out.add(w); + return out.toIntArray(); + } + + // Unpack upper 32 bits (w) of (w,k) key pair. + private static int unpackfirst(long key) { + return (int) (key >>> 32); + } + + // Unpack lower 32 bits (k) of (w,k) key pair. + private static int unpacksecond(long key) { + return (int) (key); + } + + // Append symbol to end of int-array. + private static int[] packint(int[] arr, int last) { + int[] result = Arrays.copyOf(arr, arr.length + 1); + result[arr.length] = last; + return result; + } + + // Reconstruct phrase to lzw-code. + private static int[] unpack(int code, int nUnique, Map<Integer, Long> dict) { + // Base symbol (implicit alphabet) + if (code < nUnique) + return new int[]{code}; + + Stack<Integer> stack = new Stack<>(); + int c = code; + + while (c >= nUnique) { + Long key = dict.get(c); + if (key == null) + throw new IllegalStateException("Missing dictionary entry for code: " + c); + + int symbol = unpacksecond(key); + stack.push(symbol); + c = unpackfirst(key); + } + + // Basissymbol + stack.push(c); + int[] outarray = new int[stack.size()]; + int i = 0; + // korrekt ins Output schreiben + while (!stack.isEmpty()) { + outarray[i++] = stack.pop(); + } + return outarray; + } + + // Decompresses an LZW-compressed vector into its pre-compressed AMapToData form. + // TODO: Compatibility with compress() and used data structures. Improve time/space complexity. + private static AMapToData decompress(int[] codes, int nUnique, int nRows, int index) { + // Validate input arguments. + if (codes == null) + throw new IllegalArgumentException("codes is null"); + if (codes.length == 0) + throw new IllegalArgumentException("codes is empty"); + if (nUnique <= 0) + throw new IllegalArgumentException("Invalid alphabet size: " + nUnique); + if (nRows <= 0) { + throw new IllegalArgumentException("Invalid nRows: " + nRows); + } + if (index > nRows){ + throw new IllegalArgumentException("Index is larger than Data Length: " + index); + } + + // Maps: code -> packKey(prefixCode, lastSymbolOfPhrase). + // Base symbols (0..nUnique-1) are implicit and not stored here. + final Map<Integer, Long> dict = new HashMap<>(); + + // Output mapping that will be reconstructed. + AMapToData out = MapToFactory.create(index, nUnique); + int outPos = 0; // Current write position in the output mapping. + + // Decode the first code. The first code always expands to a valid phrase without needing + // any dictionary entries. + int old = codes[0]; + int[] oldPhrase = unpack(old, nUnique, dict); + for (int v : oldPhrase){ + if (outPos == index) break; + out.set(outPos++, v); + } + + // Next free dictionary code. Codes 0..nUnique-1 are reserved for base symbols. + int nextCode = nUnique; + + // Process remaining codes. + for (int i = 1; i < codes.length; i++) { + int key = codes[i]; + + int[] next; + if (key < nUnique || dict.containsKey(key)) { + // Normal case: The code is either a base symbol or already present in the dictionary. + next = unpack(key, nUnique, dict); + } else { + // KwKwK special case: The current code refers to a phrase that is being defined right now. + // next = oldPhrase + first(oldPhrase). + int first = oldPhrase[0]; + next = packint(oldPhrase, first); + } + + // Append the reconstructed phrase to the output mapping. + for (int v : next) { + if (outPos == index) break; + out.set(outPos++, v); + } + + // Add new phrase to dictionary: nextCode -> (old, firstSymbol(next)). + int first = next[0]; + dict.put(nextCode++, packKey(old, first)); + + // Advance. + old = key; + oldPhrase = next; + } + + // Safety check: decoder must produce exactly nRows symbols. + if (outPos != index) + throw new IllegalStateException("Decompression length mismatch: got " + outPos + " expected " + index); + + // Return the reconstructed mapping. + return out; + } + Review Comment: Instead of trying to support partial decompression using that `index` variable, it might make more sense to have one `decompress` method for full decompression, and another method that creates an `Iterator` to decompress values on the fly without having to allocate a full decompressed map. ########## src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDCLZW.java: ########## @@ -0,0 +1,642 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.compress.colgroup; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ExecutorService; + +import jdk.incubator.vector.DoubleVector; +import jdk.incubator.vector.VectorSpecies; +import org.apache.arrow.vector.complex.writer.BitWriter; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.compress.CompressedMatrixBlock; +import org.apache.sysds.runtime.compress.DMLCompressionException; +import org.apache.sysds.runtime.compress.colgroup.ColGroupUtils.P; +import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary; +import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory; +import org.apache.sysds.runtime.compress.colgroup.dictionary.IDictionary; +import org.apache.sysds.runtime.compress.colgroup.dictionary.IdentityDictionary; +import org.apache.sysds.runtime.compress.colgroup.dictionary.MatrixBlockDictionary; +import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory; +import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex; +import org.apache.sysds.runtime.compress.colgroup.indexes.RangeIndex; +import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData; +import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory; +import org.apache.sysds.runtime.compress.colgroup.offset.AOffsetIterator; +import org.apache.sysds.runtime.compress.colgroup.offset.OffsetFactory; +import org.apache.sysds.runtime.compress.colgroup.scheme.DDCScheme; +import org.apache.sysds.runtime.compress.colgroup.scheme.ICLAScheme; +import org.apache.sysds.runtime.compress.cost.ComputationCostEstimator; +import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup; +import org.apache.sysds.runtime.compress.estim.EstimationFactors; +import org.apache.sysds.runtime.compress.estim.encoding.EncodingFactory; +import org.apache.sysds.runtime.compress.estim.encoding.IEncode; +import org.apache.sysds.runtime.data.DenseBlock; +import org.apache.sysds.runtime.data.SparseBlock; +import org.apache.sysds.runtime.data.SparseBlockMCSR; +import org.apache.sysds.runtime.data.SparseRow; +import org.apache.sysds.runtime.functionobjects.Builtin; +import org.apache.sysds.runtime.functionobjects.Minus; +import org.apache.sysds.runtime.functionobjects.Plus; +import org.apache.sysds.runtime.matrix.data.LibMatrixMult; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.operators.BinaryOperator; +import org.apache.sysds.runtime.matrix.operators.RightScalarOperator; +import org.apache.sysds.runtime.matrix.operators.ScalarOperator; +import org.apache.sysds.runtime.matrix.operators.UnaryOperator; +import org.jboss.netty.handler.codec.compression.CompressionException; +import shaded.parquet.it.unimi.dsi.fastutil.ints.IntArrayList; +import shaded.parquet.it.unimi.dsi.fastutil.longs.Long2IntLinkedOpenHashMap; + + +import java.util.Map; +import java.util.HashMap; +import java.util.Stack; + +/** + * Class to encapsulate information about a column group that is encoded with dense dictionary encoding (DDC) whose + * mapping vector is additionally lzw compressed. + * Idea: + * - DDCLZW stores the mapping vector exclusively in compressed form. + * - No persistent MapToData cache is maintained. + * - Sequential operations decode on-the-fly, while operations requiring random access explicitly materialize and fall back to DDC. + */ +public class ColGroupDDCLZW extends APreAgg implements IMapToDataGroup { + private static final long serialVersionUID = -5769772089913918987L; + + private final int[] _dataLZW; // LZW compressed representation of the mapping + private final int _nRows; // Number of rows in the mapping vector + private final int _nUnique; // Number of unique values in the mapping vector + + // Builds a packed 64-bit key for (prefixCode(w), nextSymbol(k)) pairs used in the LZW dictionary. (TODO) + private static long packKey(int prefixCode, int nextSymbol) { + return (((long) prefixCode) << 32) | (nextSymbol & 0xffffffffL); + } + + // Compresses a mapping (AMapToData) into an LZW-compressed byte/integer/? array. (TODO) + private static int[] compress(AMapToData data) { + if (data == null) + throw new IllegalArgumentException("Invalid input: data is null"); + + final int nRows = data.size(); + if (nRows <= 0) { + throw new IllegalArgumentException("Invalid input: data has no rows"); + } + + final int nUnique = data.getUnique(); + if (nUnique <= 0) { + throw new IllegalArgumentException("Invalid input: data has no unique values"); + } + + // Fast-path: single symbol + if (nRows == 1) + return new int[]{data.getIndex(0)}; + + + // LZW dictionary. Maps (prefixCode, nextSymbol) -> newCode (to a new code). + // Using fastutil keeps lookups fast. (TODO improve time/space complexity) + final Long2IntLinkedOpenHashMap dict = new Long2IntLinkedOpenHashMap(1 << 16); + dict.defaultReturnValue(-1); + + // Output buffer (heuristic capacity; avoids frequent reallocs) + final IntArrayList out = new IntArrayList(Math.max(16, nRows / 2)); + + // Codes {0,...,nUnique - 1} are reserved for the original symbols. + int nextCode = nUnique; + + // Initialize w with the first input symbol. + // AMapToData stores dictionary indices, not actual data values. + // Since indices reference positions in an IDictionary, they are always in the valid index range 0 … nUnique−1; + int w = data.getIndex(0); + + // Process the remaining input symbols. + // Example: _data = [2,0,2,3,0,2,1,0,2]. + for (int i = 1; i < nRows; i++) { + final int k = data.getIndex(i); // next input symbol + + if (k < 0 || k >= nUnique) + throw new IllegalArgumentException("Symbol out of range: " + k + " (nUnique=" + nUnique + ")"); + + final long key = packKey(w, k); // encode (w,k) into long key + + int wk = dict.get(key); // look if wk exists in dict + if (wk != -1) { + w = wk; // wk exists in dict so replace w by wk and continue. + } else { + // wk does not exist in dict. output current phrase, add new phrase, restart at k + out.add(w); + dict.put(key, nextCode++); + w = k; // Start new phrase with k + } + } + + out.add(w); + return out.toIntArray(); + } + + // Unpack upper 32 bits (w) of (w,k) key pair. + private static int unpackfirst(long key) { + return (int) (key >>> 32); + } + + // Unpack lower 32 bits (k) of (w,k) key pair. + private static int unpacksecond(long key) { + return (int) (key); + } + + // Append symbol to end of int-array. + private static int[] packint(int[] arr, int last) { + int[] result = Arrays.copyOf(arr, arr.length + 1); + result[arr.length] = last; + return result; + } + + // Reconstruct phrase to lzw-code. + private static int[] unpack(int code, int nUnique, Map<Integer, Long> dict) { + // Base symbol (implicit alphabet) + if (code < nUnique) + return new int[]{code}; + + Stack<Integer> stack = new Stack<>(); + int c = code; + + while (c >= nUnique) { + Long key = dict.get(c); + if (key == null) + throw new IllegalStateException("Missing dictionary entry for code: " + c); + + int symbol = unpacksecond(key); + stack.push(symbol); + c = unpackfirst(key); + } + + // Basissymbol + stack.push(c); + int[] outarray = new int[stack.size()]; + int i = 0; + // korrekt ins Output schreiben + while (!stack.isEmpty()) { + outarray[i++] = stack.pop(); + } + return outarray; + } + + // Decompresses an LZW-compressed vector into its pre-compressed AMapToData form. + // TODO: Compatibility with compress() and used data structures. Improve time/space complexity. + private static AMapToData decompress(int[] codes, int nUnique, int nRows, int index) { + // Validate input arguments. + if (codes == null) + throw new IllegalArgumentException("codes is null"); + if (codes.length == 0) + throw new IllegalArgumentException("codes is empty"); + if (nUnique <= 0) + throw new IllegalArgumentException("Invalid alphabet size: " + nUnique); + if (nRows <= 0) { + throw new IllegalArgumentException("Invalid nRows: " + nRows); + } + if (index > nRows){ + throw new IllegalArgumentException("Index is larger than Data Length: " + index); + } + + // Maps: code -> packKey(prefixCode, lastSymbolOfPhrase). + // Base symbols (0..nUnique-1) are implicit and not stored here. + final Map<Integer, Long> dict = new HashMap<>(); + + // Output mapping that will be reconstructed. + AMapToData out = MapToFactory.create(index, nUnique); + int outPos = 0; // Current write position in the output mapping. + + // Decode the first code. The first code always expands to a valid phrase without needing + // any dictionary entries. + int old = codes[0]; + int[] oldPhrase = unpack(old, nUnique, dict); + for (int v : oldPhrase){ + if (outPos == index) break; + out.set(outPos++, v); + } + + // Next free dictionary code. Codes 0..nUnique-1 are reserved for base symbols. + int nextCode = nUnique; + + // Process remaining codes. + for (int i = 1; i < codes.length; i++) { + int key = codes[i]; + + int[] next; + if (key < nUnique || dict.containsKey(key)) { + // Normal case: The code is either a base symbol or already present in the dictionary. + next = unpack(key, nUnique, dict); + } else { + // KwKwK special case: The current code refers to a phrase that is being defined right now. + // next = oldPhrase + first(oldPhrase). + int first = oldPhrase[0]; + next = packint(oldPhrase, first); + } + + // Append the reconstructed phrase to the output mapping. + for (int v : next) { + if (outPos == index) break; + out.set(outPos++, v); + } + + // Add new phrase to dictionary: nextCode -> (old, firstSymbol(next)). + int first = next[0]; + dict.put(nextCode++, packKey(old, first)); + + // Advance. + old = key; + oldPhrase = next; + } + + // Safety check: decoder must produce exactly nRows symbols. + if (outPos != index) + throw new IllegalStateException("Decompression length mismatch: got " + outPos + " expected " + index); + + // Return the reconstructed mapping. + return out; + } + + // Build Constructor: Used when creating a new DDCLZW instance during compression/build time. (TODO) + private ColGroupDDCLZW(IColIndex colIndexes, IDictionary dict, AMapToData data, int[] cachedCounts) { + super(colIndexes, dict, cachedCounts); + + // Derive metadadata + _nRows = data.size(); + _nUnique = dict.getNumberOfValues(colIndexes.size()); + + // Compress mapping to LZW + _dataLZW = compress(data); + + if (CompressedMatrixBlock.debug) { + if (getNumValues() == 0) + throw new DMLCompressionException("Invalid construction with empty dictionary"); + if (_nRows == 0) + throw new DMLCompressionException("Invalid length of the data. is zero"); + if (data.getUnique() != dict.getNumberOfValues(colIndexes.size())) + throw new DMLCompressionException("Invalid map to dict Map has:" + data.getUnique() + " while dict has " + + dict.getNumberOfValues(colIndexes.size())); + int[] c = getCounts(); + if (c.length != dict.getNumberOfValues(colIndexes.size())) + throw new DMLCompressionException("Invalid DDC Construction"); + data.verify(); + } + } + + // Read Constructor: Used when creating this group from a serialized form (e.g., reading a compressed matrix from disk/memory stream). (TODO) + private ColGroupDDCLZW(IColIndex colIndexes, IDictionary dict, int[] dataLZW, int nRows, int nUnique, int[] cachedCounts) { + super(colIndexes, dict, cachedCounts); + + _dataLZW = dataLZW; + _nRows = nRows; + _nUnique = nUnique; + + if (CompressedMatrixBlock.debug) { + if (getNumValues() == 0) + throw new DMLCompressionException("Invalid construction with empty dictionary"); + if (_nRows <= 0) + throw new DMLCompressionException("Invalid length of the data. is zero"); + if (_nUnique <= dict.getNumberOfValues(colIndexes.size())) + throw new DMLCompressionException("Invalid map to dict Map has:" + _nUnique + " while dict has " + + dict.getNumberOfValues(colIndexes.size())); + int[] c = getCounts(); + if (c.length != dict.getNumberOfValues(colIndexes.size())) + throw new DMLCompressionException("Invalid DDC Construction"); + + // Optional: validate that decoding works (expensive) + // AMapToData decoded = decode(_dataLZW, _nRows, _nUnique); + // decoded.verify(); + } + } + + // Factory method for creating a column group. (AColGroup g = ColGroupDDCLZW.create(...);) + public static AColGroup create(IColIndex colIndexes, IDictionary dict, AMapToData data, int[] cachedCounts) { + if (dict == null) + return new ColGroupEmpty(colIndexes); + else if (data.getUnique() == 1) + return ColGroupConst.create(colIndexes, dict); + else + return new ColGroupDDCLZW(colIndexes, dict, data, cachedCounts); + } + + /* + * TODO: Operations with complex access patterns shall be uncompressed to ddc format. + * ... return ColGroupDDC.create(...,decompress(_dataLZW),...). We need to decide which methods are + * suitable for sequential and which arent. those who arent then we shall materialize and fall back to ddc + * */ + + public AColGroup convertToDDC() { + final AMapToData map = decompress(_dataLZW, _nUnique, _nRows, _nRows); + final int[] counts = getCounts(); // may be null depending on your group + return ColGroupDDC.create(_colIndexes, _dict, map, counts); + } + + public AColGroup convertToDDC(int index) { + final AMapToData map = decompress(_dataLZW, _nUnique, _nRows, index); + final int[] counts = getCounts(); // may be null depending on your group + return ColGroupDDC.create(_colIndexes, _dict, map, counts); + } + + // Deserialize ColGroupDDCLZW object in binary stream. + public static ColGroupDDCLZW read(DataInput in) throws IOException { + final IColIndex colIndexes = ColIndexFactory.read(in); + final IDictionary dict = DictionaryFactory.read(in); + + // Metadata for lzw mapping. + final int nRows = in.readInt(); + final int nUnique = in.readInt(); + + // Read compressed mapping array. + final int len = in.readInt(); + if (len < 0) + throw new IOException("Invalid LZW data length: " + len); + + final int[] dataLZW = new int[len]; + for (int i = 0; i < len; i++) + dataLZW[i] = in.readInt(); + + // cachedCounts currently not serialized (mirror ColGroupDDC.read which passes null) + return new ColGroupDDCLZW(colIndexes, dict, dataLZW, nRows, nUnique, null); + } + + // Serialize a ColGroupDDC-object into binary stream. + @Override + public void write(DataOutput out) throws IOException { + _colIndexes.write(out); + _dict.write(out); + out.writeInt(_nRows); + out.writeInt(_nUnique); + out.writeInt(_dataLZW.length); + for (int i : _dataLZW) out.writeInt(i); + } + + @Override + public double getIdx(int r, int colIdx) { + // TODO: soll schnell sein + final AMapToData map = decompress(_dataLZW, _nUnique, _nRows, r); + // TODO: ColumnIndex + return map.getIndex(r); + } + + @Override + public CompressionType getCompType() { + return CompressionType.DDCLZW; + } + + @Override + protected ColGroupType getColGroupType() { + return ColGroupType.DDCLZW; + } + + @Override + public boolean containsValue(double pattern) { + return _dict.containsValue(pattern); + } + + @Override + public double getCost(ComputationCostEstimator e, int nRows) { + final int nVals = getNumValues(); + final int nCols = getNumCols(); + return e.getCost(nRows, nRows, nCols, nVals, _dict.getSparsity()); + } + + @Override + public ICLAScheme getCompressionScheme() { + //TODO: in ColGroupDDCFor nicht implementiert - sollen wir das erstellen? Inhalt: ncols wie DDC + throw new NotImplementedException(); + } + + @Override + protected int numRowsToMultiply() { + return _nRows; + } + + @Override + protected AColGroup copyAndSet(IColIndex colIndexes, IDictionary newDictionary) { + return new ColGroupDDCLZW(colIndexes, newDictionary, _dataLZW, _nRows, _nUnique, getCachedCounts()); + } + + @Override + public AMapToData getMapToData() { + throw new NotImplementedException(); // or decompress and return data... decompress(_dataLZW, _nUnique, _nRows, _nRows) + } + + @Override + public boolean sameIndexStructure(AColGroupCompressed that) { + return that instanceof ColGroupDDCLZW && ((ColGroupDDCLZW) that)._dataLZW == _dataLZW; + } + + @Override + protected double computeMxx(double c, Builtin builtin) { + return _dict.aggregate(c, builtin); + } + + @Override + protected void computeColMxx(double[] c, Builtin builtin) { + _dict.aggregateCols(c, builtin, _colIndexes); + } + + @Override + public AColGroup sliceRows(int rl, int ru) { + try{ + AMapToData map = decompress(_dataLZW, _nUnique, _nRows, ru); + return ColGroupDDCLZW.create(_colIndexes, _dict, map.slice(rl, ru), null); + } catch(Exception e){ + throw new DMLRuntimeException("Failed to slice out sub part DDCLZW: " + rl + ", " + ru, e); + } + } + + + @Override + protected void decompressToDenseBlockTransposedSparseDictionary(DenseBlock db, int rl, int ru, SparseBlock dict) { + + } + + @Override + protected void decompressToDenseBlockTransposedDenseDictionary(DenseBlock db, int rl, int ru, double[] dict) { + + } + + @Override + protected void decompressToSparseBlockTransposedSparseDictionary(SparseBlockMCSR db, SparseBlock dict, int nColOut) { + + } + + @Override + protected void decompressToSparseBlockTransposedDenseDictionary(SparseBlockMCSR db, double[] dict, int nColOut) { + + } + + @Override + protected void decompressToDenseBlockSparseDictionary(DenseBlock db, int rl, int ru, int offR, int offC, SparseBlock sb) { + + } + + @Override + protected void decompressToDenseBlockDenseDictionary(DenseBlock db, int rl, int ru, int offR, int offC, double[] values) { + + } Review Comment: Decompression to dense block should be supported ########## src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDCLZW.java: ########## Review Comment: All implemented methods must be covered by tests ########## src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDCLZW.java: ########## @@ -0,0 +1,642 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.compress.colgroup; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ExecutorService; + +import jdk.incubator.vector.DoubleVector; +import jdk.incubator.vector.VectorSpecies; +import org.apache.arrow.vector.complex.writer.BitWriter; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.compress.CompressedMatrixBlock; +import org.apache.sysds.runtime.compress.DMLCompressionException; +import org.apache.sysds.runtime.compress.colgroup.ColGroupUtils.P; +import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary; +import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory; +import org.apache.sysds.runtime.compress.colgroup.dictionary.IDictionary; +import org.apache.sysds.runtime.compress.colgroup.dictionary.IdentityDictionary; +import org.apache.sysds.runtime.compress.colgroup.dictionary.MatrixBlockDictionary; +import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory; +import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex; +import org.apache.sysds.runtime.compress.colgroup.indexes.RangeIndex; +import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData; +import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory; +import org.apache.sysds.runtime.compress.colgroup.offset.AOffsetIterator; +import org.apache.sysds.runtime.compress.colgroup.offset.OffsetFactory; +import org.apache.sysds.runtime.compress.colgroup.scheme.DDCScheme; +import org.apache.sysds.runtime.compress.colgroup.scheme.ICLAScheme; +import org.apache.sysds.runtime.compress.cost.ComputationCostEstimator; +import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup; +import org.apache.sysds.runtime.compress.estim.EstimationFactors; +import org.apache.sysds.runtime.compress.estim.encoding.EncodingFactory; +import org.apache.sysds.runtime.compress.estim.encoding.IEncode; +import org.apache.sysds.runtime.data.DenseBlock; +import org.apache.sysds.runtime.data.SparseBlock; +import org.apache.sysds.runtime.data.SparseBlockMCSR; +import org.apache.sysds.runtime.data.SparseRow; +import org.apache.sysds.runtime.functionobjects.Builtin; +import org.apache.sysds.runtime.functionobjects.Minus; +import org.apache.sysds.runtime.functionobjects.Plus; +import org.apache.sysds.runtime.matrix.data.LibMatrixMult; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.operators.BinaryOperator; +import org.apache.sysds.runtime.matrix.operators.RightScalarOperator; +import org.apache.sysds.runtime.matrix.operators.ScalarOperator; +import org.apache.sysds.runtime.matrix.operators.UnaryOperator; +import org.jboss.netty.handler.codec.compression.CompressionException; +import shaded.parquet.it.unimi.dsi.fastutil.ints.IntArrayList; +import shaded.parquet.it.unimi.dsi.fastutil.longs.Long2IntLinkedOpenHashMap; + + +import java.util.Map; +import java.util.HashMap; +import java.util.Stack; + +/** + * Class to encapsulate information about a column group that is encoded with dense dictionary encoding (DDC) whose + * mapping vector is additionally lzw compressed. + * Idea: + * - DDCLZW stores the mapping vector exclusively in compressed form. + * - No persistent MapToData cache is maintained. + * - Sequential operations decode on-the-fly, while operations requiring random access explicitly materialize and fall back to DDC. + */ +public class ColGroupDDCLZW extends APreAgg implements IMapToDataGroup { + private static final long serialVersionUID = -5769772089913918987L; + + private final int[] _dataLZW; // LZW compressed representation of the mapping + private final int _nRows; // Number of rows in the mapping vector + private final int _nUnique; // Number of unique values in the mapping vector + + // Builds a packed 64-bit key for (prefixCode(w), nextSymbol(k)) pairs used in the LZW dictionary. (TODO) + private static long packKey(int prefixCode, int nextSymbol) { + return (((long) prefixCode) << 32) | (nextSymbol & 0xffffffffL); + } + + // Compresses a mapping (AMapToData) into an LZW-compressed byte/integer/? array. (TODO) + private static int[] compress(AMapToData data) { + if (data == null) + throw new IllegalArgumentException("Invalid input: data is null"); + + final int nRows = data.size(); + if (nRows <= 0) { + throw new IllegalArgumentException("Invalid input: data has no rows"); + } + + final int nUnique = data.getUnique(); + if (nUnique <= 0) { + throw new IllegalArgumentException("Invalid input: data has no unique values"); + } + + // Fast-path: single symbol + if (nRows == 1) + return new int[]{data.getIndex(0)}; + + + // LZW dictionary. Maps (prefixCode, nextSymbol) -> newCode (to a new code). + // Using fastutil keeps lookups fast. (TODO improve time/space complexity) + final Long2IntLinkedOpenHashMap dict = new Long2IntLinkedOpenHashMap(1 << 16); + dict.defaultReturnValue(-1); + + // Output buffer (heuristic capacity; avoids frequent reallocs) + final IntArrayList out = new IntArrayList(Math.max(16, nRows / 2)); + + // Codes {0,...,nUnique - 1} are reserved for the original symbols. + int nextCode = nUnique; + + // Initialize w with the first input symbol. + // AMapToData stores dictionary indices, not actual data values. + // Since indices reference positions in an IDictionary, they are always in the valid index range 0 … nUnique−1; + int w = data.getIndex(0); + + // Process the remaining input symbols. + // Example: _data = [2,0,2,3,0,2,1,0,2]. + for (int i = 1; i < nRows; i++) { + final int k = data.getIndex(i); // next input symbol + + if (k < 0 || k >= nUnique) + throw new IllegalArgumentException("Symbol out of range: " + k + " (nUnique=" + nUnique + ")"); + + final long key = packKey(w, k); // encode (w,k) into long key + + int wk = dict.get(key); // look if wk exists in dict + if (wk != -1) { + w = wk; // wk exists in dict so replace w by wk and continue. + } else { + // wk does not exist in dict. output current phrase, add new phrase, restart at k + out.add(w); + dict.put(key, nextCode++); + w = k; // Start new phrase with k + } + } + + out.add(w); + return out.toIntArray(); + } + + // Unpack upper 32 bits (w) of (w,k) key pair. + private static int unpackfirst(long key) { + return (int) (key >>> 32); + } + + // Unpack lower 32 bits (k) of (w,k) key pair. + private static int unpacksecond(long key) { + return (int) (key); + } + + // Append symbol to end of int-array. + private static int[] packint(int[] arr, int last) { + int[] result = Arrays.copyOf(arr, arr.length + 1); + result[arr.length] = last; + return result; + } + + // Reconstruct phrase to lzw-code. + private static int[] unpack(int code, int nUnique, Map<Integer, Long> dict) { + // Base symbol (implicit alphabet) + if (code < nUnique) + return new int[]{code}; + + Stack<Integer> stack = new Stack<>(); + int c = code; + + while (c >= nUnique) { + Long key = dict.get(c); + if (key == null) + throw new IllegalStateException("Missing dictionary entry for code: " + c); + + int symbol = unpacksecond(key); + stack.push(symbol); + c = unpackfirst(key); + } + + // Basissymbol + stack.push(c); + int[] outarray = new int[stack.size()]; + int i = 0; + // korrekt ins Output schreiben + while (!stack.isEmpty()) { + outarray[i++] = stack.pop(); + } + return outarray; + } + + // Decompresses an LZW-compressed vector into its pre-compressed AMapToData form. + // TODO: Compatibility with compress() and used data structures. Improve time/space complexity. + private static AMapToData decompress(int[] codes, int nUnique, int nRows, int index) { + // Validate input arguments. + if (codes == null) + throw new IllegalArgumentException("codes is null"); + if (codes.length == 0) + throw new IllegalArgumentException("codes is empty"); + if (nUnique <= 0) + throw new IllegalArgumentException("Invalid alphabet size: " + nUnique); + if (nRows <= 0) { + throw new IllegalArgumentException("Invalid nRows: " + nRows); + } + if (index > nRows){ + throw new IllegalArgumentException("Index is larger than Data Length: " + index); + } + + // Maps: code -> packKey(prefixCode, lastSymbolOfPhrase). + // Base symbols (0..nUnique-1) are implicit and not stored here. + final Map<Integer, Long> dict = new HashMap<>(); + + // Output mapping that will be reconstructed. + AMapToData out = MapToFactory.create(index, nUnique); + int outPos = 0; // Current write position in the output mapping. + + // Decode the first code. The first code always expands to a valid phrase without needing + // any dictionary entries. + int old = codes[0]; + int[] oldPhrase = unpack(old, nUnique, dict); + for (int v : oldPhrase){ + if (outPos == index) break; + out.set(outPos++, v); + } + + // Next free dictionary code. Codes 0..nUnique-1 are reserved for base symbols. + int nextCode = nUnique; + + // Process remaining codes. + for (int i = 1; i < codes.length; i++) { + int key = codes[i]; + + int[] next; + if (key < nUnique || dict.containsKey(key)) { + // Normal case: The code is either a base symbol or already present in the dictionary. + next = unpack(key, nUnique, dict); + } else { + // KwKwK special case: The current code refers to a phrase that is being defined right now. + // next = oldPhrase + first(oldPhrase). + int first = oldPhrase[0]; + next = packint(oldPhrase, first); + } + + // Append the reconstructed phrase to the output mapping. + for (int v : next) { + if (outPos == index) break; + out.set(outPos++, v); + } + + // Add new phrase to dictionary: nextCode -> (old, firstSymbol(next)). + int first = next[0]; + dict.put(nextCode++, packKey(old, first)); + + // Advance. + old = key; + oldPhrase = next; + } + + // Safety check: decoder must produce exactly nRows symbols. + if (outPos != index) + throw new IllegalStateException("Decompression length mismatch: got " + outPos + " expected " + index); + + // Return the reconstructed mapping. + return out; + } + + // Build Constructor: Used when creating a new DDCLZW instance during compression/build time. (TODO) + private ColGroupDDCLZW(IColIndex colIndexes, IDictionary dict, AMapToData data, int[] cachedCounts) { + super(colIndexes, dict, cachedCounts); + + // Derive metadadata + _nRows = data.size(); + _nUnique = dict.getNumberOfValues(colIndexes.size()); + + // Compress mapping to LZW + _dataLZW = compress(data); + + if (CompressedMatrixBlock.debug) { + if (getNumValues() == 0) + throw new DMLCompressionException("Invalid construction with empty dictionary"); + if (_nRows == 0) + throw new DMLCompressionException("Invalid length of the data. is zero"); + if (data.getUnique() != dict.getNumberOfValues(colIndexes.size())) + throw new DMLCompressionException("Invalid map to dict Map has:" + data.getUnique() + " while dict has " + + dict.getNumberOfValues(colIndexes.size())); + int[] c = getCounts(); + if (c.length != dict.getNumberOfValues(colIndexes.size())) + throw new DMLCompressionException("Invalid DDC Construction"); + data.verify(); + } + } + + // Read Constructor: Used when creating this group from a serialized form (e.g., reading a compressed matrix from disk/memory stream). (TODO) + private ColGroupDDCLZW(IColIndex colIndexes, IDictionary dict, int[] dataLZW, int nRows, int nUnique, int[] cachedCounts) { + super(colIndexes, dict, cachedCounts); + + _dataLZW = dataLZW; + _nRows = nRows; + _nUnique = nUnique; + + if (CompressedMatrixBlock.debug) { + if (getNumValues() == 0) + throw new DMLCompressionException("Invalid construction with empty dictionary"); + if (_nRows <= 0) + throw new DMLCompressionException("Invalid length of the data. is zero"); + if (_nUnique <= dict.getNumberOfValues(colIndexes.size())) + throw new DMLCompressionException("Invalid map to dict Map has:" + _nUnique + " while dict has " + + dict.getNumberOfValues(colIndexes.size())); Review Comment: Incorrect ########## src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDCLZW.java: ########## @@ -0,0 +1,642 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.compress.colgroup; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ExecutorService; + +import jdk.incubator.vector.DoubleVector; +import jdk.incubator.vector.VectorSpecies; +import org.apache.arrow.vector.complex.writer.BitWriter; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.compress.CompressedMatrixBlock; +import org.apache.sysds.runtime.compress.DMLCompressionException; +import org.apache.sysds.runtime.compress.colgroup.ColGroupUtils.P; +import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary; +import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory; +import org.apache.sysds.runtime.compress.colgroup.dictionary.IDictionary; +import org.apache.sysds.runtime.compress.colgroup.dictionary.IdentityDictionary; +import org.apache.sysds.runtime.compress.colgroup.dictionary.MatrixBlockDictionary; +import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory; +import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex; +import org.apache.sysds.runtime.compress.colgroup.indexes.RangeIndex; +import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData; +import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory; +import org.apache.sysds.runtime.compress.colgroup.offset.AOffsetIterator; +import org.apache.sysds.runtime.compress.colgroup.offset.OffsetFactory; +import org.apache.sysds.runtime.compress.colgroup.scheme.DDCScheme; +import org.apache.sysds.runtime.compress.colgroup.scheme.ICLAScheme; +import org.apache.sysds.runtime.compress.cost.ComputationCostEstimator; +import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup; +import org.apache.sysds.runtime.compress.estim.EstimationFactors; +import org.apache.sysds.runtime.compress.estim.encoding.EncodingFactory; +import org.apache.sysds.runtime.compress.estim.encoding.IEncode; +import org.apache.sysds.runtime.data.DenseBlock; +import org.apache.sysds.runtime.data.SparseBlock; +import org.apache.sysds.runtime.data.SparseBlockMCSR; +import org.apache.sysds.runtime.data.SparseRow; +import org.apache.sysds.runtime.functionobjects.Builtin; +import org.apache.sysds.runtime.functionobjects.Minus; +import org.apache.sysds.runtime.functionobjects.Plus; +import org.apache.sysds.runtime.matrix.data.LibMatrixMult; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.operators.BinaryOperator; +import org.apache.sysds.runtime.matrix.operators.RightScalarOperator; +import org.apache.sysds.runtime.matrix.operators.ScalarOperator; +import org.apache.sysds.runtime.matrix.operators.UnaryOperator; +import org.jboss.netty.handler.codec.compression.CompressionException; +import shaded.parquet.it.unimi.dsi.fastutil.ints.IntArrayList; +import shaded.parquet.it.unimi.dsi.fastutil.longs.Long2IntLinkedOpenHashMap; + + +import java.util.Map; +import java.util.HashMap; +import java.util.Stack; + +/** + * Class to encapsulate information about a column group that is encoded with dense dictionary encoding (DDC) whose + * mapping vector is additionally lzw compressed. + * Idea: + * - DDCLZW stores the mapping vector exclusively in compressed form. + * - No persistent MapToData cache is maintained. + * - Sequential operations decode on-the-fly, while operations requiring random access explicitly materialize and fall back to DDC. + */ +public class ColGroupDDCLZW extends APreAgg implements IMapToDataGroup { + private static final long serialVersionUID = -5769772089913918987L; + + private final int[] _dataLZW; // LZW compressed representation of the mapping + private final int _nRows; // Number of rows in the mapping vector + private final int _nUnique; // Number of unique values in the mapping vector + + // Builds a packed 64-bit key for (prefixCode(w), nextSymbol(k)) pairs used in the LZW dictionary. (TODO) + private static long packKey(int prefixCode, int nextSymbol) { + return (((long) prefixCode) << 32) | (nextSymbol & 0xffffffffL); + } + + // Compresses a mapping (AMapToData) into an LZW-compressed byte/integer/? array. (TODO) + private static int[] compress(AMapToData data) { + if (data == null) + throw new IllegalArgumentException("Invalid input: data is null"); + + final int nRows = data.size(); + if (nRows <= 0) { + throw new IllegalArgumentException("Invalid input: data has no rows"); + } + + final int nUnique = data.getUnique(); + if (nUnique <= 0) { + throw new IllegalArgumentException("Invalid input: data has no unique values"); + } + + // Fast-path: single symbol + if (nRows == 1) + return new int[]{data.getIndex(0)}; + + + // LZW dictionary. Maps (prefixCode, nextSymbol) -> newCode (to a new code). + // Using fastutil keeps lookups fast. (TODO improve time/space complexity) + final Long2IntLinkedOpenHashMap dict = new Long2IntLinkedOpenHashMap(1 << 16); + dict.defaultReturnValue(-1); + + // Output buffer (heuristic capacity; avoids frequent reallocs) + final IntArrayList out = new IntArrayList(Math.max(16, nRows / 2)); + + // Codes {0,...,nUnique - 1} are reserved for the original symbols. + int nextCode = nUnique; + + // Initialize w with the first input symbol. + // AMapToData stores dictionary indices, not actual data values. + // Since indices reference positions in an IDictionary, they are always in the valid index range 0 … nUnique−1; + int w = data.getIndex(0); + + // Process the remaining input symbols. + // Example: _data = [2,0,2,3,0,2,1,0,2]. + for (int i = 1; i < nRows; i++) { + final int k = data.getIndex(i); // next input symbol + + if (k < 0 || k >= nUnique) + throw new IllegalArgumentException("Symbol out of range: " + k + " (nUnique=" + nUnique + ")"); + + final long key = packKey(w, k); // encode (w,k) into long key + + int wk = dict.get(key); // look if wk exists in dict + if (wk != -1) { + w = wk; // wk exists in dict so replace w by wk and continue. + } else { + // wk does not exist in dict. output current phrase, add new phrase, restart at k + out.add(w); + dict.put(key, nextCode++); + w = k; // Start new phrase with k + } + } + + out.add(w); + return out.toIntArray(); + } + + // Unpack upper 32 bits (w) of (w,k) key pair. + private static int unpackfirst(long key) { + return (int) (key >>> 32); + } + + // Unpack lower 32 bits (k) of (w,k) key pair. + private static int unpacksecond(long key) { + return (int) (key); + } + + // Append symbol to end of int-array. + private static int[] packint(int[] arr, int last) { + int[] result = Arrays.copyOf(arr, arr.length + 1); + result[arr.length] = last; + return result; + } + + // Reconstruct phrase to lzw-code. + private static int[] unpack(int code, int nUnique, Map<Integer, Long> dict) { + // Base symbol (implicit alphabet) + if (code < nUnique) + return new int[]{code}; + + Stack<Integer> stack = new Stack<>(); + int c = code; + + while (c >= nUnique) { + Long key = dict.get(c); + if (key == null) + throw new IllegalStateException("Missing dictionary entry for code: " + c); + + int symbol = unpacksecond(key); + stack.push(symbol); + c = unpackfirst(key); + } + + // Basissymbol + stack.push(c); + int[] outarray = new int[stack.size()]; + int i = 0; + // korrekt ins Output schreiben + while (!stack.isEmpty()) { + outarray[i++] = stack.pop(); + } + return outarray; + } + + // Decompresses an LZW-compressed vector into its pre-compressed AMapToData form. + // TODO: Compatibility with compress() and used data structures. Improve time/space complexity. + private static AMapToData decompress(int[] codes, int nUnique, int nRows, int index) { + // Validate input arguments. + if (codes == null) + throw new IllegalArgumentException("codes is null"); + if (codes.length == 0) + throw new IllegalArgumentException("codes is empty"); + if (nUnique <= 0) + throw new IllegalArgumentException("Invalid alphabet size: " + nUnique); + if (nRows <= 0) { + throw new IllegalArgumentException("Invalid nRows: " + nRows); + } + if (index > nRows){ + throw new IllegalArgumentException("Index is larger than Data Length: " + index); + } + + // Maps: code -> packKey(prefixCode, lastSymbolOfPhrase). + // Base symbols (0..nUnique-1) are implicit and not stored here. + final Map<Integer, Long> dict = new HashMap<>(); + + // Output mapping that will be reconstructed. + AMapToData out = MapToFactory.create(index, nUnique); + int outPos = 0; // Current write position in the output mapping. + + // Decode the first code. The first code always expands to a valid phrase without needing + // any dictionary entries. + int old = codes[0]; + int[] oldPhrase = unpack(old, nUnique, dict); + for (int v : oldPhrase){ + if (outPos == index) break; + out.set(outPos++, v); + } + + // Next free dictionary code. Codes 0..nUnique-1 are reserved for base symbols. + int nextCode = nUnique; + + // Process remaining codes. + for (int i = 1; i < codes.length; i++) { + int key = codes[i]; + + int[] next; + if (key < nUnique || dict.containsKey(key)) { + // Normal case: The code is either a base symbol or already present in the dictionary. + next = unpack(key, nUnique, dict); + } else { + // KwKwK special case: The current code refers to a phrase that is being defined right now. + // next = oldPhrase + first(oldPhrase). + int first = oldPhrase[0]; + next = packint(oldPhrase, first); + } + + // Append the reconstructed phrase to the output mapping. + for (int v : next) { + if (outPos == index) break; + out.set(outPos++, v); + } + + // Add new phrase to dictionary: nextCode -> (old, firstSymbol(next)). + int first = next[0]; + dict.put(nextCode++, packKey(old, first)); + + // Advance. + old = key; + oldPhrase = next; + } + + // Safety check: decoder must produce exactly nRows symbols. + if (outPos != index) + throw new IllegalStateException("Decompression length mismatch: got " + outPos + " expected " + index); + + // Return the reconstructed mapping. + return out; + } + + // Build Constructor: Used when creating a new DDCLZW instance during compression/build time. (TODO) + private ColGroupDDCLZW(IColIndex colIndexes, IDictionary dict, AMapToData data, int[] cachedCounts) { + super(colIndexes, dict, cachedCounts); + + // Derive metadadata + _nRows = data.size(); + _nUnique = dict.getNumberOfValues(colIndexes.size()); + + // Compress mapping to LZW + _dataLZW = compress(data); + + if (CompressedMatrixBlock.debug) { + if (getNumValues() == 0) + throw new DMLCompressionException("Invalid construction with empty dictionary"); + if (_nRows == 0) + throw new DMLCompressionException("Invalid length of the data. is zero"); + if (data.getUnique() != dict.getNumberOfValues(colIndexes.size())) + throw new DMLCompressionException("Invalid map to dict Map has:" + data.getUnique() + " while dict has " + + dict.getNumberOfValues(colIndexes.size())); + int[] c = getCounts(); + if (c.length != dict.getNumberOfValues(colIndexes.size())) + throw new DMLCompressionException("Invalid DDC Construction"); + data.verify(); + } + } + + // Read Constructor: Used when creating this group from a serialized form (e.g., reading a compressed matrix from disk/memory stream). (TODO) + private ColGroupDDCLZW(IColIndex colIndexes, IDictionary dict, int[] dataLZW, int nRows, int nUnique, int[] cachedCounts) { + super(colIndexes, dict, cachedCounts); + + _dataLZW = dataLZW; + _nRows = nRows; + _nUnique = nUnique; + + if (CompressedMatrixBlock.debug) { + if (getNumValues() == 0) + throw new DMLCompressionException("Invalid construction with empty dictionary"); + if (_nRows <= 0) + throw new DMLCompressionException("Invalid length of the data. is zero"); + if (_nUnique <= dict.getNumberOfValues(colIndexes.size())) + throw new DMLCompressionException("Invalid map to dict Map has:" + _nUnique + " while dict has " + + dict.getNumberOfValues(colIndexes.size())); + int[] c = getCounts(); + if (c.length != dict.getNumberOfValues(colIndexes.size())) + throw new DMLCompressionException("Invalid DDC Construction"); + + // Optional: validate that decoding works (expensive) + // AMapToData decoded = decode(_dataLZW, _nRows, _nUnique); + // decoded.verify(); + } + } + + // Factory method for creating a column group. (AColGroup g = ColGroupDDCLZW.create(...);) + public static AColGroup create(IColIndex colIndexes, IDictionary dict, AMapToData data, int[] cachedCounts) { + if (dict == null) + return new ColGroupEmpty(colIndexes); + else if (data.getUnique() == 1) + return ColGroupConst.create(colIndexes, dict); + else + return new ColGroupDDCLZW(colIndexes, dict, data, cachedCounts); + } + + /* + * TODO: Operations with complex access patterns shall be uncompressed to ddc format. + * ... return ColGroupDDC.create(...,decompress(_dataLZW),...). We need to decide which methods are + * suitable for sequential and which arent. those who arent then we shall materialize and fall back to ddc + * */ + + public AColGroup convertToDDC() { + final AMapToData map = decompress(_dataLZW, _nUnique, _nRows, _nRows); + final int[] counts = getCounts(); // may be null depending on your group + return ColGroupDDC.create(_colIndexes, _dict, map, counts); + } + + public AColGroup convertToDDC(int index) { + final AMapToData map = decompress(_dataLZW, _nUnique, _nRows, index); + final int[] counts = getCounts(); // may be null depending on your group + return ColGroupDDC.create(_colIndexes, _dict, map, counts); + } + + // Deserialize ColGroupDDCLZW object in binary stream. + public static ColGroupDDCLZW read(DataInput in) throws IOException { + final IColIndex colIndexes = ColIndexFactory.read(in); + final IDictionary dict = DictionaryFactory.read(in); + + // Metadata for lzw mapping. + final int nRows = in.readInt(); + final int nUnique = in.readInt(); + + // Read compressed mapping array. + final int len = in.readInt(); + if (len < 0) + throw new IOException("Invalid LZW data length: " + len); + + final int[] dataLZW = new int[len]; + for (int i = 0; i < len; i++) + dataLZW[i] = in.readInt(); + + // cachedCounts currently not serialized (mirror ColGroupDDC.read which passes null) + return new ColGroupDDCLZW(colIndexes, dict, dataLZW, nRows, nUnique, null); + } + + // Serialize a ColGroupDDC-object into binary stream. + @Override + public void write(DataOutput out) throws IOException { + _colIndexes.write(out); + _dict.write(out); + out.writeInt(_nRows); + out.writeInt(_nUnique); + out.writeInt(_dataLZW.length); + for (int i : _dataLZW) out.writeInt(i); + } + + @Override + public double getIdx(int r, int colIdx) { + // TODO: soll schnell sein + final AMapToData map = decompress(_dataLZW, _nUnique, _nRows, r); + // TODO: ColumnIndex + return map.getIndex(r); + } + + @Override + public CompressionType getCompType() { + return CompressionType.DDCLZW; + } + + @Override + protected ColGroupType getColGroupType() { + return ColGroupType.DDCLZW; + } + + @Override + public boolean containsValue(double pattern) { + return _dict.containsValue(pattern); + } + + @Override + public double getCost(ComputationCostEstimator e, int nRows) { + final int nVals = getNumValues(); + final int nCols = getNumCols(); + return e.getCost(nRows, nRows, nCols, nVals, _dict.getSparsity()); + } + + @Override + public ICLAScheme getCompressionScheme() { + //TODO: in ColGroupDDCFor nicht implementiert - sollen wir das erstellen? Inhalt: ncols wie DDC + throw new NotImplementedException(); + } + + @Override + protected int numRowsToMultiply() { + return _nRows; + } + + @Override + protected AColGroup copyAndSet(IColIndex colIndexes, IDictionary newDictionary) { + return new ColGroupDDCLZW(colIndexes, newDictionary, _dataLZW, _nRows, _nUnique, getCachedCounts()); + } + + @Override + public AMapToData getMapToData() { + throw new NotImplementedException(); // or decompress and return data... decompress(_dataLZW, _nUnique, _nRows, _nRows) + } + + @Override + public boolean sameIndexStructure(AColGroupCompressed that) { + return that instanceof ColGroupDDCLZW && ((ColGroupDDCLZW) that)._dataLZW == _dataLZW; + } + + @Override + protected double computeMxx(double c, Builtin builtin) { + return _dict.aggregate(c, builtin); + } + + @Override + protected void computeColMxx(double[] c, Builtin builtin) { + _dict.aggregateCols(c, builtin, _colIndexes); + } + + @Override + public AColGroup sliceRows(int rl, int ru) { + try{ + AMapToData map = decompress(_dataLZW, _nUnique, _nRows, ru); + return ColGroupDDCLZW.create(_colIndexes, _dict, map.slice(rl, ru), null); + } catch(Exception e){ + throw new DMLRuntimeException("Failed to slice out sub part DDCLZW: " + rl + ", " + ru, e); + } + } + + + @Override + protected void decompressToDenseBlockTransposedSparseDictionary(DenseBlock db, int rl, int ru, SparseBlock dict) { + + } + + @Override + protected void decompressToDenseBlockTransposedDenseDictionary(DenseBlock db, int rl, int ru, double[] dict) { + + } + + @Override + protected void decompressToSparseBlockTransposedSparseDictionary(SparseBlockMCSR db, SparseBlock dict, int nColOut) { + + } + + @Override + protected void decompressToSparseBlockTransposedDenseDictionary(SparseBlockMCSR db, double[] dict, int nColOut) { + + } + + @Override + protected void decompressToDenseBlockSparseDictionary(DenseBlock db, int rl, int ru, int offR, int offC, SparseBlock sb) { + + } + + @Override + protected void decompressToDenseBlockDenseDictionary(DenseBlock db, int rl, int ru, int offR, int offC, double[] values) { + + } + + @Override + protected void decompressToSparseBlockSparseDictionary(SparseBlock ret, int rl, int ru, int offR, int offC, SparseBlock sb) { + + } + + @Override + protected void decompressToSparseBlockDenseDictionary(SparseBlock ret, int rl, int ru, int offR, int offC, double[] values) { + + } + + @Override + public void leftMultByMatrixNoPreAgg(MatrixBlock matrix, MatrixBlock result, int rl, int ru, int cl, int cu) { + + } + + @Override + public AColGroup scalarOperation(ScalarOperator op) { + return null; + } + + @Override + public AColGroup binaryRowOpLeft(BinaryOperator op, double[] v, boolean isRowSafe) { + return null; + } + + @Override + public AColGroup binaryRowOpRight(BinaryOperator op, double[] v, boolean isRowSafe) { + return null; + } + + @Override + public AColGroup unaryOperation(UnaryOperator op) { + return null; + } + + @Override + public AColGroup append(AColGroup g) { + return null; + } + + @Override + protected AColGroup appendNInternal(AColGroup[] groups, int blen, int rlen) { + return null; + } + + @Override + public AColGroup recompress() { + return null; + } + + @Override + public CompressedSizeInfoColGroup getCompressionInfo(int nRow) { + return null; + } + + @Override + protected AColGroup fixColIndexes(IColIndex newColIndex, int[] reordering) { + return null; + } + + @Override + protected void sparseSelection(MatrixBlock selection, P[] points, MatrixBlock ret, int rl, int ru) { + + } + + @Override + protected void denseSelection(MatrixBlock selection, P[] points, MatrixBlock ret, int rl, int ru) { + + } + + @Override + public AColGroup[] splitReshape(int multiplier, int nRow, int nColOrg) { + return new AColGroup[0]; + } + + @Override + protected boolean allowShallowIdentityRightMult() { + return false; + } + + @Override + protected AColGroup allocateRightMultiplication(MatrixBlock right, IColIndex colIndexes, IDictionary preAgg) { + return null; + } + + @Override + public void preAggregateDense(MatrixBlock m, double[] preAgg, int rl, int ru, int cl, int cu) { + + } + + @Override + public void preAggregateSparse(SparseBlock sb, double[] preAgg, int rl, int ru, int cl, int cu) { + + } + + @Override + protected void preAggregateThatDDCStructure(ColGroupDDC that, Dictionary ret) { + + } + + @Override + protected void preAggregateThatSDCZerosStructure(ColGroupSDCZeros that, Dictionary ret) { + + } + + @Override + protected void preAggregateThatSDCSingleZerosStructure(ColGroupSDCSingleZeros that, Dictionary ret) { + + } + + @Override + protected void preAggregateThatRLEStructure(ColGroupRLE that, Dictionary ret) { + + } + + @Override + public void leftMMIdentityPreAggregateDense(MatrixBlock that, MatrixBlock ret, int rl, int ru, int cl, int cu) { + + } + + @Override + protected int[] getCounts(int[] out) { + return new int[0]; + } Review Comment: If not properly implemented, throw NotImplementedException -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
