This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit 678b28a2210dcdf11841d9486d1dac4fdacb036d
Author: baunsgaard <[email protected]>
AuthorDate: Wed Oct 19 16:38:16 2022 +0200

    [MINOR] Disable Hadoop Write compression
---
 .../sysds/runtime/compress/io/CompressWrap.java    |  1 -
 .../runtime/compress/io/WriterCompressed.java      |  3 +-
 .../sysds/runtime/compress/lib/CLALibCombine.java  | 93 +++++++++++-----------
 3 files changed, 48 insertions(+), 49 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/io/CompressWrap.java 
b/src/main/java/org/apache/sysds/runtime/compress/io/CompressWrap.java
index e3e370b694..c17edc2b6d 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/io/CompressWrap.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/io/CompressWrap.java
@@ -20,7 +20,6 @@
 package org.apache.sysds.runtime.compress.io;
 
 import org.apache.spark.api.java.function.Function;
-import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 
 public class CompressWrap implements Function<MatrixBlock, 
CompressedWriteBlock> {
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java 
b/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java
index d2194b98f2..82f2be59e4 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java
@@ -124,7 +124,8 @@ public final class WriterCompressed extends MatrixWriter {
                // Make Writer (New interface)
                final Writer w = SequenceFile.createWriter(job, 
Writer.file(path), Writer.bufferSize(4096),
                        Writer.blockSize(4096), 
Writer.keyClass(MatrixIndexes.class), 
Writer.valueClass(CompressedWriteBlock.class),
-                       
Writer.compression(SequenceFile.CompressionType.RECORD), 
Writer.replication((short) 1));
+                       Writer.compression(SequenceFile.CompressionType.NONE), 
// No Compression type on disk
+                        Writer.replication((short) 1));
 
                final int rlen = src.getNumRows();
                final int clen = src.getNumColumns();
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCombine.java 
b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCombine.java
index 7600167250..ca54f7d181 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCombine.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCombine.java
@@ -32,13 +32,14 @@ import 
org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
 
+import edu.emory.mathcs.backport.java.util.Arrays;
+
 public class CLALibCombine {
 
        protected static final Log LOG = 
LogFactory.getLog(CLALibCombine.class.getName());
 
        public static MatrixBlock combine(Map<MatrixIndexes, MatrixBlock> m) {
                // Dynamically find rlen, clen and blen;
-
                // assume that the blen is the same in all blocks.
                // assume that all blocks are there ...
                final MatrixIndexes lookup = new MatrixIndexes(1, 1);
@@ -57,15 +58,21 @@ public class CLALibCombine {
                        lookup.setIndexes(1, lookup.getColumnIndex() + 1);
                }
 
-               return combine(m, (int) rows, (int) cols, blen);
+               return combine(m, lookup, (int) rows, (int) cols, blen);
        }
 
        public static MatrixBlock combine(Map<MatrixIndexes, MatrixBlock> m, 
int rlen, int clen, int blen) {
+               final MatrixIndexes lookup = new MatrixIndexes();
+               return combine(m, lookup, rlen, clen, blen);
+       }
+
+       private static MatrixBlock combine(final Map<MatrixIndexes, 
MatrixBlock> m, final MatrixIndexes lookup,
+               final int rlen, final int clen, final int blen) {
 
                if(rlen < blen) // Shortcut, in case file only contains one 
block in r length.
-                       return CombiningColumnGroups(m, rlen, clen, blen);
+                       return CombiningColumnGroups(m, lookup, rlen, clen, 
blen);
+
                final CompressionType[] colTypes = new CompressionType[clen];
-               final MatrixIndexes lookup = new MatrixIndexes();
                // Look through the first blocks in to the top.
                for(int bc = 0; bc * blen < clen; bc++) {
                        lookup.setIndexes(1, bc + 1); // get first blocks
@@ -119,10 +126,11 @@ public class CLALibCombine {
                        }
                }
 
-               return CombiningColumnGroups(m, rlen, clen, blen);
+               return CombiningColumnGroups(m, lookup, rlen, clen, blen);
        }
 
-       private static MatrixBlock combineViaDecompression(Map<MatrixIndexes, 
MatrixBlock> m, int rlen, int clen, int blen) {
+       private static MatrixBlock combineViaDecompression(final 
Map<MatrixIndexes, MatrixBlock> m, final int rlen,
+               final int clen, final int blen) {
                final MatrixBlock out = new MatrixBlock(rlen, clen, false);
                out.allocateDenseBlock();
                for(Entry<MatrixIndexes, MatrixBlock> e : m.entrySet()) {
@@ -140,57 +148,48 @@ public class CLALibCombine {
        }
 
        // It is known all of the matrices are Compressed and they are non 
overlapping.
-       private static MatrixBlock CombiningColumnGroups(Map<MatrixIndexes, 
MatrixBlock> m, int rlen, int clen, int blen) {
+       private static MatrixBlock CombiningColumnGroups(final 
Map<MatrixIndexes, MatrixBlock> m, final MatrixIndexes lookup,
+               final int rlen, final int clen, final int blen) {
 
-               final AColGroup[] finalCols = new AColGroup[clen];
-               final MatrixIndexes lookup = new MatrixIndexes();
-               for(int bc = 0; bc * blen < clen; bc++) {
-                       lookup.setIndexes(1, bc + 1); // get first blocks
-                       final MatrixBlock b = m.get(lookup);
-                       final CompressedMatrixBlock cmb = 
(CompressedMatrixBlock) b;
-
-                       final List<AColGroup> gs = cmb.getColGroups();
-                       for(AColGroup g : gs) {
-                               AColGroup gc = g;
-                               if(bc > 0)
-                                       gc = g.shiftColIndices(bc * blen);
-                               final int[] cols = gc.getColIndices();
-                               finalCols[cols[0]] = gc; // only assign first 
column of each group.
-                       }
-               }
+               final AColGroup[][] finalCols = new AColGroup[clen][]; // temp 
array for combining
+               final int blocksInColumn = (rlen - 1) / blen + 1;
+               final int nGroups = m.size() / blocksInColumn;
 
-               for(int br = 1; br * blen < rlen; br++) {
+               // Add all the blocks into linear structure.
+               for(int br = 0; br * blen < rlen; br++) {
                        for(int bc = 0; bc * blen < clen; bc++) {
-                               lookup.setIndexes(br + 1, bc + 1); // get first 
blocks
-                               final MatrixBlock b = m.get(lookup);
-
-                               final CompressedMatrixBlock cmb = 
(CompressedMatrixBlock) b;
-
-                               final List<AColGroup> gs = cmb.getColGroups();
-                               for(AColGroup g : gs) {
-                                       AColGroup gc = g;
-                                       if(bc > 0)
-                                               gc = g.shiftColIndices(bc * 
blen);
+                               lookup.setIndexes(br + 1, bc + 1);
+                               final CompressedMatrixBlock cmb = 
(CompressedMatrixBlock) m.get(lookup);
+                               for(AColGroup g : cmb.getColGroups()) {
+                                       final AColGroup gc = bc > 0 ? 
g.shiftColIndices(bc * blen) : g;
                                        final int[] cols = gc.getColIndices();
-                                       AColGroup prev = finalCols[cols[0]];
-                                       AColGroup comb = prev.append(gc);
-                                       if(comb == null) {
-                                               LOG.warn("Combining of columns 
from group: " + prev.getClass().getSimpleName() + " and "
-                                                       + 
gc.getClass().getSimpleName() + " was non trivial, therefore falling back to 
decompression");
-                                               return 
combineViaDecompression(m, rlen, clen, blen);
-                                       }
-                                       finalCols[cols[0]] = comb;
+                                       if(br == 0)
+                                               finalCols[cols[0]] = new 
AColGroup[blocksInColumn];
 
+                                       finalCols[cols[0]][br] = gc;
                                }
                        }
                }
 
-               List<AColGroup> finalGroups = new ArrayList<>();
-
-               for(AColGroup g : finalCols)
-                       if(g != null)
-                               finalGroups.add(g);
+               final List<AColGroup> finalGroups = new ArrayList<>(nGroups);
+               for(AColGroup[] colOfGroups : finalCols) {
+                       if(colOfGroups != null) { // skip null entries
+                               final AColGroup combined = 
combineN(colOfGroups);
+                               if(combined == null) {
+                                       LOG.warn("Combining of columns from 
group failed: " + Arrays.toString(colOfGroups));
+                                       return combineViaDecompression(m, rlen, 
clen, blen);
+                               }
+                               finalGroups.add(combined);
+                       }
+               }
 
                return new CompressedMatrixBlock(rlen, clen, -1, false, 
finalGroups);
        }
+
+       private static AColGroup combineN(AColGroup[] groups) {
+               AColGroup r = groups[0];
+               for(int i = 1; i < groups.length && r != null; i++)
+                       r = r.append(groups[i]);
+               return r;
+       }
 }

Reply via email to