This is an automated email from the ASF dual-hosted git repository.

estrauss pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 3f7b17badb [SYSTEMDS-3902] Sparse data transfer: Python --> Java
3f7b17badb is described below

commit 3f7b17badbf81ace6e811c2194a8817456ec4de9
Author: e-strauss <[email protected]>
AuthorDate: Thu Feb 12 15:47:43 2026 +0100

    [SYSTEMDS-3902] Sparse data transfer: Python --> Java
    
    This commit implements optimized data transfer for Scipy sparse matrices 
from Python to the Java runtime. Key changes include the addition of 
`convertSciPyCSRToMB` and `convertSciPyCOOToMB` in the Java utility layer to 
directly handle compressed sparse row and coordinate formats. On the Python 
side, the `SystemDSContext` now supports a `sparse_data_transfer` flag and a 
new `from_py` method to unify data ingestion. These updates allow sparse data 
to be transferred without being convert [...]
    
    Closes #2379.
---
 .../sysds/runtime/util/Py4jConverterUtils.java     |  42 ++
 .../python/systemds/context/systemds_context.py    |  79 +++-
 src/main/python/systemds/utils/converters.py       | 327 +++++++------
 .../python/tests/matrix/test_block_converter.py    | 145 +++++-
 .../frame/array/Py4jConverterUtilsTest.java        | 240 ----------
 .../component/utils/Py4jConverterUtilsTest.java    | 510 +++++++++++++++++++++
 6 files changed, 952 insertions(+), 391 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java 
b/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java
index 7faee722d0..f705a3c9c0 100644
--- a/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.charset.StandardCharsets;
 
+import org.apache.log4j.Logger;
 import org.apache.sysds.common.Types;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.frame.data.columns.Array;
@@ -35,6 +36,7 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock;
  * Utils for converting python data to java.
  */
 public class Py4jConverterUtils {
+       private static final Logger LOG = 
Logger.getLogger(Py4jConverterUtils.class);
        public static MatrixBlock convertPy4JArrayToMB(byte[] data, int rlen, 
int clen) {
                return convertPy4JArrayToMB(data, rlen, clen, false, 
Types.ValueType.FP64);
        }
@@ -63,6 +65,45 @@ public class Py4jConverterUtils {
                return mb;
        }
 
+       public static MatrixBlock convertSciPyCSRToMB(byte[] data, byte[] 
indices, byte[] indptr, int rlen, int clen, int nnz) {
+               LOG.debug("Converting compressed sparse row matrix to 
MatrixBlock");
+               MatrixBlock mb = new MatrixBlock(rlen, clen, true);
+               mb.allocateSparseRowsBlock(false);
+               ByteBuffer dataBuf = ByteBuffer.wrap(data);
+               dataBuf.order(ByteOrder.nativeOrder());
+               ByteBuffer indicesBuf = ByteBuffer.wrap(indices);
+               indicesBuf.order(ByteOrder.nativeOrder());
+               ByteBuffer indptrBuf = ByteBuffer.wrap(indptr);
+               indptrBuf.order(ByteOrder.nativeOrder());
+               
+               // Read indptr array to get row boundaries
+               int[] rowPtrs = new int[rlen + 1];
+               for(int i = 0; i <= rlen; i++) {
+                       rowPtrs[i] = indptrBuf.getInt();
+               }
+               
+               // Iterate through each row
+               for(int row = 0; row < rlen; row++) {
+                       int startIdx = rowPtrs[row];
+                       int endIdx = rowPtrs[row + 1];
+                       
+                       // Set buffer positions to the start of this row
+                       dataBuf.position(startIdx * Double.BYTES);
+                       indicesBuf.position(startIdx * Integer.BYTES);
+                       
+                       // Process all non-zeros in this row sequentially
+                       for(int idx = startIdx; idx < endIdx; idx++) {
+                               double val = dataBuf.getDouble();
+                               int colIndex = indicesBuf.getInt();
+                               mb.set(row, colIndex, val);
+                       }
+               }
+               
+               mb.recomputeNonZeros();
+               mb.examSparsity();
+               return mb;
+       }
+
        public static MatrixBlock allocateDenseOrSparse(int rlen, int clen, 
boolean isSparse) {
                MatrixBlock ret = new MatrixBlock(rlen, clen, isSparse);
                ret.allocateBlock();
@@ -208,6 +249,7 @@ public class Py4jConverterUtils {
        public static byte[] convertMBtoPy4JDenseArr(MatrixBlock mb) {
                byte[] ret = null;
                if(mb.isInSparseFormat()) {
+                       LOG.debug("Converting sparse matrix to dense");
                        mb.sparseToDense();
                }
 
diff --git a/src/main/python/systemds/context/systemds_context.py 
b/src/main/python/systemds/context/systemds_context.py
index 99a6cba57b..7268e5b86a 100644
--- a/src/main/python/systemds/context/systemds_context.py
+++ b/src/main/python/systemds/context/systemds_context.py
@@ -29,17 +29,19 @@ import socket
 import sys
 import struct
 import traceback
+import warnings
 from contextlib import contextmanager
 from glob import glob
 from queue import Queue
 from subprocess import PIPE, Popen
 from threading import Thread
-from time import sleep, time
+from time import sleep
 from typing import Dict, Iterable, Sequence, Tuple, Union
 from concurrent.futures import ThreadPoolExecutor
 
 import numpy as np
 import pandas as pd
+import scipy.sparse as sp
 from py4j.java_gateway import GatewayParameters, JavaGateway, Py4JNetworkError
 from systemds.operator import (
     Frame,
@@ -77,6 +79,7 @@ class SystemDSContext(object):
     _FIFO_JAVA2PY_PIPES = []
     _data_transfer_mode = 0
     _multi_pipe_enabled = False
+    _sparse_data_transfer = True
     _logging_initialized = False
     _executor_pool = ThreadPoolExecutor(max_workers=os.cpu_count() * 2 or 4)
 
@@ -89,6 +92,7 @@ class SystemDSContext(object):
         py4j_logging_level: int = 50,
         data_transfer_mode: int = 1,
         multi_pipe_enabled: bool = False,
+        sparse_data_transfer: bool = True,
     ):
         """Starts a new instance of SystemDSContext, in which the connection 
to a JVM systemds instance is handled
         Any new instance of this SystemDS Context, would start a separate new 
JVM.
@@ -103,14 +107,26 @@ class SystemDSContext(object):
             The logging levels are as follows: 10 DEBUG, 20 INFO, 30 WARNING, 
40 ERROR, 50 CRITICAL.
         :param py4j_logging_level: The logging level for Py4j to use, since 
all communication to the JVM is done through this,
             it can be verbose if not set high.
-        :param data_transfer_mode: default 0,
+        :param data_transfer_mode: default 0, 0 for py4j, 1 for using pipes 
(on unix systems)
+        :param multi_pipe_enabled: default False, if True, use multiple pipes 
for data transfer
+            only used if data_transfer_mode is 1.
+            .. experimental:: This parameter is experimental and may be 
removed in a future version.
+        :param sparse_data_transfer: default True, if True, use optimized 
sparse matrix transfer,
+            if False, convert sparse matrices to dense arrays before transfer
         """
 
+        if multi_pipe_enabled:
+            warnings.warn(
+                "The 'multi_pipe_enabled' parameter is experimental and may be 
removed in a future version.",
+                DeprecationWarning,
+                stacklevel=2,
+            )
         self.__setup_logging(logging_level, py4j_logging_level)
         self.__start(port, capture_stdout)
         self.capture_stats(capture_statistics)
         self._log.debug("Started JVM and SystemDS python context manager")
         self.__setup_data_transfer(data_transfer_mode, multi_pipe_enabled)
+        self._sparse_data_transfer = sparse_data_transfer
 
     def __setup_data_transfer(self, data_transfer_mode=0, 
multi_pipe_enabled=False):
         self._data_transfer_mode = data_transfer_mode
@@ -769,21 +785,65 @@ class SystemDSContext(object):
         # therefore the output type is assign.
         return Scalar(self, v, assign=True)
 
+    def from_py(
+        self,
+        src: Union[np.ndarray, sp.spmatrix, pd.DataFrame, pd.Series],
+        *args: Sequence[VALID_INPUT_TYPES],
+        **kwargs: Dict[str, VALID_INPUT_TYPES],
+    ) -> Union[Matrix, Frame]:
+        """Generate DAGNode representing data given by a python object, which 
will be sent to SystemDS on need.
+        :param src: the python object
+        :param args: unnamed parameters
+        :param kwargs: named parameters
+        :return: A Matrix or Frame Node
+        """
+
+        def get_params(src, args, kwargs):
+            unnamed_params = ["'./tmp/{file_name}'"]
+            if len(src.shape) == 2:
+                named_params = {"rows": src.shape[0], "cols": src.shape[1]}
+            elif len(src.shape) == 1:
+                named_params = {"rows": src.shape[0], "cols": 1}
+            else:
+                # TODO Support tensors.
+                raise ValueError("Only two dimensional arrays supported")
+            unnamed_params.extend(args)
+            named_params.update(kwargs)
+            return unnamed_params, named_params
+
+        if isinstance(src, np.ndarray):
+            unnamed_params, named_params = get_params(src, args, kwargs)
+            return Matrix(self, "read", unnamed_params, named_params, 
local_data=src)
+        elif isinstance(src, sp.spmatrix):
+            unnamed_params, named_params = get_params(src, args, kwargs)
+            return Matrix(self, "read", unnamed_params, named_params, 
local_data=src)
+        elif isinstance(src, pd.DataFrame):
+            unnamed_params, named_params = get_params(src, args, kwargs)
+            named_params["data_type"] = '"frame"'
+            return Frame(self, "read", unnamed_params, named_params, 
local_data=src)
+        elif isinstance(src, pd.Series):
+            unnamed_params, named_params = get_params(src, args, kwargs)
+            named_params["data_type"] = '"frame"'
+            return Frame(self, "read", unnamed_params, named_params, 
local_data=src)
+        else:
+            raise ValueError(f"Unsupported data type: {type(src)}")
+
     def from_numpy(
         self,
-        mat: np.array,
+        mat: Union[np.ndarray, sp.spmatrix],
         *args: Sequence[VALID_INPUT_TYPES],
         **kwargs: Dict[str, VALID_INPUT_TYPES],
     ) -> Matrix:
-        """Generate DAGNode representing matrix with data given by a numpy 
array, which will be sent to SystemDS
-        on need.
+        """Generate DAGNode representing matrix with data given by a numpy 
array or scipy sparse matrix,
+        which will be sent to SystemDS on need.
 
-        :param mat: the numpy array
+        :param mat: the numpy array or scipy sparse matrix
         :param args: unnamed parameters
         :param kwargs: named parameters
         :return: A Matrix
+        Note: This method is deprecated. Use from_py instead.
         """
-
+        self._log.warning(f"Deprecated method from_numpy. Use from_py 
instead.")
         unnamed_params = ["'./tmp/{file_name}'"]
 
         if len(mat.shape) == 2:
@@ -811,7 +871,9 @@ class SystemDSContext(object):
         :param args: unnamed parameters
         :param kwargs: named parameters
         :return: A Frame
+        Note: This method is deprecated. Use from_py instead.
         """
+        self._log.warning(f"Deprecated method from_pandas. Use from_py 
instead.")
         unnamed_params = ["'./tmp/{file_name}'"]
 
         if len(df.shape) == 2:
@@ -824,9 +886,6 @@ class SystemDSContext(object):
 
         unnamed_params.extend(args)
         named_params["data_type"] = '"frame"'
-
-        self._pd_dataframe = df
-
         named_params.update(kwargs)
         return Frame(self, "read", unnamed_params, named_params, local_data=df)
 
diff --git a/src/main/python/systemds/utils/converters.py 
b/src/main/python/systemds/utils/converters.py
index 5f4619a8bb..7cabf2aabd 100644
--- a/src/main/python/systemds/utils/converters.py
+++ b/src/main/python/systemds/utils/converters.py
@@ -21,11 +21,13 @@
 
 import struct
 from time import time
+from typing import Union
 import numpy as np
 import pandas as pd
 import concurrent.futures
 from py4j.java_gateway import JavaClass, JavaGateway, JavaObject, JVMView
 import os
+import scipy.sparse as sp
 
 # Constants
 _HANDSHAKE_OFFSET = 1000
@@ -86,129 +88,6 @@ def _pipe_receive_bytes(pipe, view, offset, end, 
batch_size_bytes, logger):
         offset += actual_size
 
 
-def _pipe_receive_strings(
-    pipe, num_strings, batch_size=_DEFAULT_BATCH_SIZE_BYTES, pipe_id=0, 
logger=None
-):
-    """
-    Reads UTF-8 encoded strings from the pipe in batches.
-    Format: <I (little-endian int32) length prefix, followed by UTF-8 bytes.
-
-    Returns: tuple of (strings_list, total_time, decode_time, io_time, 
num_strings)
-    """
-    t_total_start = time()
-    t_decode = 0.0
-    t_io = 0.0
-
-    strings = []
-    fd = pipe.fileno()  # Cache file descriptor
-
-    # Use a reusable buffer to avoid repeated allocations
-    buf = bytearray(batch_size * 2)
-    buf_pos = 0
-    buf_remaining = 0  # Number of bytes already in buffer
-
-    i = 0
-    while i < num_strings:
-        # If we don't have enough bytes for the length prefix, read more
-        if buf_remaining < _STRING_LENGTH_PREFIX_SIZE:
-            # Shift remaining bytes to start of buffer
-            if buf_remaining > 0:
-                buf[:buf_remaining] = buf[buf_pos : buf_pos + buf_remaining]
-
-            # Read more data
-            t0 = time()
-            chunk = os.read(fd, batch_size)
-            t_io += time() - t0
-            if not chunk:
-                raise IOError("Pipe read returned empty data unexpectedly")
-
-            # Append new data to buffer
-            chunk_len = len(chunk)
-            if buf_remaining + chunk_len > len(buf):
-                # Grow buffer if needed
-                new_buf = bytearray(len(buf) * 2)
-                new_buf[:buf_remaining] = buf[:buf_remaining]
-                buf = new_buf
-
-            buf[buf_remaining : buf_remaining + chunk_len] = chunk
-            buf_remaining += chunk_len
-            buf_pos = 0
-
-        # Read length prefix (little-endian int32)
-        # Note: length can be -1 (0xFFFFFFFF) to indicate null value
-        length = struct.unpack(
-            "<i", buf[buf_pos : buf_pos + _STRING_LENGTH_PREFIX_SIZE]
-        )[0]
-        buf_pos += _STRING_LENGTH_PREFIX_SIZE
-        buf_remaining -= _STRING_LENGTH_PREFIX_SIZE
-
-        # Handle null value (marked by -1)
-        if length == -1:
-            strings.append(None)
-            i += 1
-            continue
-
-        # If we don't have enough bytes for the string data, read more
-        if buf_remaining < length:
-            # Shift remaining bytes to start of buffer
-            if buf_remaining > 0:
-                buf[:buf_remaining] = buf[buf_pos : buf_pos + buf_remaining]
-            buf_pos = 0
-
-            # Read more data until we have enough
-            bytes_needed = length - buf_remaining
-            while bytes_needed > 0:
-                t0 = time()
-                chunk = os.read(fd, min(batch_size, bytes_needed))
-                t_io += time() - t0
-                if not chunk:
-                    raise IOError("Pipe read returned empty data unexpectedly")
-
-                chunk_len = len(chunk)
-                if buf_remaining + chunk_len > len(buf):
-                    # Grow buffer if needed
-                    new_buf = bytearray(len(buf) * 2)
-                    new_buf[:buf_remaining] = buf[:buf_remaining]
-                    buf = new_buf
-
-                buf[buf_remaining : buf_remaining + chunk_len] = chunk
-                buf_remaining += chunk_len
-                bytes_needed -= chunk_len
-
-        # Decode the string
-        t0 = time()
-        if length == 0:
-            decoded_str = ""
-        else:
-            decoded_str = buf[buf_pos : buf_pos + length].decode("utf-8")
-        t_decode += time() - t0
-
-        strings.append(decoded_str)
-        buf_pos += length
-        buf_remaining -= length
-        i += 1
-    header_received = False
-    if buf_remaining == _STRING_LENGTH_PREFIX_SIZE:
-        # There is still data in the buffer, probably the handshake header
-        received = struct.unpack(
-            "<i", buf[buf_pos : buf_pos + _STRING_LENGTH_PREFIX_SIZE]
-        )[0]
-        if received != pipe_id + _HANDSHAKE_OFFSET:
-            raise ValueError(
-                "Handshake mismatch: expected {}, got {}".format(
-                    pipe_id + _HANDSHAKE_OFFSET, received
-                )
-            )
-        header_received = True
-    elif buf_remaining > _STRING_LENGTH_PREFIX_SIZE:
-        raise ValueError(
-            "Unexpected number of bytes in buffer: {}".format(buf_remaining)
-        )
-
-    t_total = time() - t_total_start
-    return (strings, t_total, t_decode, t_io, num_strings, header_received)
-
-
 def _get_numpy_value_type(jvm, dtype):
     """Maps numpy dtype to SystemDS ValueType."""
     if dtype is np.dtype(np.uint8):
@@ -280,37 +159,43 @@ def _transfer_matrix_block_multi_pipe(
     return fut_java.result()  # Java returns MatrixBlock
 
 
-def numpy_to_matrix_block(sds, np_arr: np.array):
-    """Converts a given numpy array, to internal matrix block representation.
+def numpy_to_matrix_block(sds, arr: Union[np.ndarray, sp.spmatrix]):
+    """Converts a given numpy array or scipy sparse matrix to internal matrix 
block representation.
 
     :param sds: The current systemds context.
-    :param np_arr: the numpy array to convert to matrixblock.
+    :param arr: the numpy array or scipy sparse matrix to convert to 
matrixblock.
     """
-    assert np_arr.ndim <= 2, "np_arr invalid, because it has more than 2 
dimensions"
-    rows = np_arr.shape[0]
-    cols = np_arr.shape[1] if np_arr.ndim == 2 else 1
+    assert arr.ndim <= 2, "np_arr invalid, because it has more than 2 
dimensions"
+    rows = arr.shape[0]
+    cols = arr.shape[1] if arr.ndim == 2 else 1
 
     if rows > 2147483647:
         raise ValueError("Matrix rows exceed maximum value (2147483647)")
 
     # If not numpy array then convert to numpy array
-    if not isinstance(np_arr, np.ndarray):
-        np_arr = np.asarray(np_arr, dtype=np.float64)
+    if isinstance(arr, sp.spmatrix):
+        if sds._sparse_data_transfer:
+            return scipy_sparse_matrix_to_matrix_block(sds, arr)
+        else:
+            # Convert sparse matrix to dense array
+            arr = arr.toarray()
+    if not isinstance(arr, np.ndarray):
+        arr = np.asarray(arr, dtype=np.float64)
 
     jvm: JVMView = sds.java_gateway.jvm
     ep = sds.java_gateway.entry_point
 
     # Flatten and set value type
-    if np_arr.dtype is np.dtype(np.uint8):
-        arr = np_arr.ravel()
-    elif np_arr.dtype is np.dtype(np.int32):
-        arr = np_arr.ravel()
-    elif np_arr.dtype is np.dtype(np.float32):
-        arr = np_arr.ravel()
+    if arr.dtype is np.dtype(np.uint8):
+        arr = arr.ravel()
+    elif arr.dtype is np.dtype(np.int32):
+        arr = arr.ravel()
+    elif arr.dtype is np.dtype(np.float32):
+        arr = arr.ravel()
     else:
-        arr = np_arr.ravel().astype(np.float64)
+        arr = arr.ravel().astype(np.float64)
 
-    value_type = _get_numpy_value_type(jvm, np_arr.dtype)
+    value_type = _get_numpy_value_type(jvm, arr.dtype)
 
     if sds._data_transfer_mode == 1:
         mv = memoryview(arr).cast("B")
@@ -334,7 +219,7 @@ def numpy_to_matrix_block(sds, np_arr: np.array):
             )
         else:
             return _transfer_matrix_block_multi_pipe(
-                sds, mv, arr, np_arr, total_bytes, rows, cols, value_type, ep, 
jvm
+                sds, mv, arr, arr, total_bytes, rows, cols, value_type, ep, jvm
             )
     else:
         # Prepare byte buffer and send data to java via Py4J
@@ -343,6 +228,45 @@ def numpy_to_matrix_block(sds, np_arr: np.array):
         return j_class.convertPy4JArrayToMB(buf, rows, cols, value_type)
 
 
+def scipy_sparse_matrix_to_matrix_block(sds, arr: sp.spmatrix):
+    """Converts a given scipy sparse matrix to an internal matrix block 
representation.
+
+    :param sds: The current systemds context.
+    :param arr: The scipy sparse matrix to convert to matrixblock.
+    """
+    jvm: JVMView = sds.java_gateway.jvm
+
+    if sds._data_transfer_mode == 1:
+        # single pipe implementation
+        pass
+    else:
+        # py4j implementation
+        j_class: JavaClass = 
jvm.org.apache.sysds.runtime.util.Py4jConverterUtils
+        if isinstance(arr, sp.csr_matrix):
+            data = arr.data.tobytes()
+            indices = arr.indices.tobytes()
+            indptr = arr.indptr.tobytes()
+            nnz = arr.nnz
+            rows = arr.shape[0]
+            cols = arr.shape[1]
+            # convertSciPyCSRToMB(byte[] data, byte[] indices, byte[] indptr, 
int rlen, int clen, int nnz)
+            return j_class.convertSciPyCSRToMB(data, indices, indptr, rows, 
cols, nnz)
+        elif isinstance(arr, sp.coo_matrix):
+            data = arr.data.tobytes()
+            row = arr.row.tobytes()
+            col = arr.col.tobytes()
+            nnz = arr.nnz
+            rows = arr.shape[0]
+            cols = arr.shape[1]
+            # convertSciPyCOOToMB(byte[] data, byte[] row, byte[] col, int 
rlen, int clen, int nnz)
+            return j_class.convertSciPyCOOToMB(data, row, col, rows, cols, nnz)
+        else:
+            sds._log.info(
+                f"Converting unsupported sparse matrix type: 
{type(arr).__name__} to CSR for efficient transfer."
+            )
+            return scipy_sparse_matrix_to_matrix_block(sds, arr.tocsr())
+
+
 def matrix_block_to_numpy(sds, mb: JavaObject):
     """Converts a MatrixBlock object in the JVM to a numpy array.
 
@@ -761,6 +685,129 @@ def _pipe_transfer_strings(pipe, pd_series, 
batch_size=_DEFAULT_BATCH_SIZE_BYTES
     return (t_total, t_encoding, t_packing, t_io, num_strings)
 
 
+def _pipe_receive_strings(
+    pipe, num_strings, batch_size=_DEFAULT_BATCH_SIZE_BYTES, pipe_id=0, 
logger=None
+):
+    """
+    Reads UTF-8 encoded strings from the pipe in batches.
+    Format: <I (little-endian int32) length prefix, followed by UTF-8 bytes.
+
+    Returns: tuple of (strings_list, total_time, decode_time, io_time, 
num_strings)
+    """
+    t_total_start = time()
+    t_decode = 0.0
+    t_io = 0.0
+
+    strings = []
+    fd = pipe.fileno()  # Cache file descriptor
+
+    # Use a reusable buffer to avoid repeated allocations
+    buf = bytearray(batch_size * 2)
+    buf_pos = 0
+    buf_remaining = 0  # Number of bytes already in buffer
+
+    i = 0
+    while i < num_strings:
+        # If we don't have enough bytes for the length prefix, read more
+        if buf_remaining < _STRING_LENGTH_PREFIX_SIZE:
+            # Shift remaining bytes to start of buffer
+            if buf_remaining > 0:
+                buf[:buf_remaining] = buf[buf_pos : buf_pos + buf_remaining]
+
+            # Read more data
+            t0 = time()
+            chunk = os.read(fd, batch_size)
+            t_io += time() - t0
+            if not chunk:
+                raise IOError("Pipe read returned empty data unexpectedly")
+
+            # Append new data to buffer
+            chunk_len = len(chunk)
+            if buf_remaining + chunk_len > len(buf):
+                # Grow buffer if needed
+                new_buf = bytearray(len(buf) * 2)
+                new_buf[:buf_remaining] = buf[:buf_remaining]
+                buf = new_buf
+
+            buf[buf_remaining : buf_remaining + chunk_len] = chunk
+            buf_remaining += chunk_len
+            buf_pos = 0
+
+        # Read length prefix (little-endian int32)
+        # Note: length can be -1 (0xFFFFFFFF) to indicate null value
+        length = struct.unpack(
+            "<i", buf[buf_pos : buf_pos + _STRING_LENGTH_PREFIX_SIZE]
+        )[0]
+        buf_pos += _STRING_LENGTH_PREFIX_SIZE
+        buf_remaining -= _STRING_LENGTH_PREFIX_SIZE
+
+        # Handle null value (marked by -1)
+        if length == -1:
+            strings.append(None)
+            i += 1
+            continue
+
+        # If we don't have enough bytes for the string data, read more
+        if buf_remaining < length:
+            # Shift remaining bytes to start of buffer
+            if buf_remaining > 0:
+                buf[:buf_remaining] = buf[buf_pos : buf_pos + buf_remaining]
+            buf_pos = 0
+
+            # Read more data until we have enough
+            bytes_needed = length - buf_remaining
+            while bytes_needed > 0:
+                t0 = time()
+                chunk = os.read(fd, min(batch_size, bytes_needed))
+                t_io += time() - t0
+                if not chunk:
+                    raise IOError("Pipe read returned empty data unexpectedly")
+
+                chunk_len = len(chunk)
+                if buf_remaining + chunk_len > len(buf):
+                    # Grow buffer if needed
+                    new_buf = bytearray(len(buf) * 2)
+                    new_buf[:buf_remaining] = buf[:buf_remaining]
+                    buf = new_buf
+
+                buf[buf_remaining : buf_remaining + chunk_len] = chunk
+                buf_remaining += chunk_len
+                bytes_needed -= chunk_len
+
+        # Decode the string
+        t0 = time()
+        if length == 0:
+            decoded_str = ""
+        else:
+            decoded_str = buf[buf_pos : buf_pos + length].decode("utf-8")
+        t_decode += time() - t0
+
+        strings.append(decoded_str)
+        buf_pos += length
+        buf_remaining -= length
+        i += 1
+    header_received = False
+    if buf_remaining == _STRING_LENGTH_PREFIX_SIZE:
+        # There is still data in the buffer, probably the handshake header
+        received = struct.unpack(
+            "<i", buf[buf_pos : buf_pos + _STRING_LENGTH_PREFIX_SIZE]
+        )[0]
+        if received != pipe_id + _HANDSHAKE_OFFSET:
+            raise ValueError(
+                "Handshake mismatch: expected {}, got {}".format(
+                    pipe_id + _HANDSHAKE_OFFSET, received
+                )
+            )
+        header_received = True
+    elif buf_remaining > _STRING_LENGTH_PREFIX_SIZE:
+        raise ValueError(
+            "Unexpected number of bytes in buffer: {}".format(buf_remaining)
+        )
+
+    t_total = time() - t_total_start
+    return (strings, t_total, t_decode, t_io, num_strings, header_received)
+
+
 def _get_elem_size_for_type(d_type):
     """Returns the element size in bytes for a given SystemDS type."""
     return {
diff --git a/src/main/python/tests/matrix/test_block_converter.py 
b/src/main/python/tests/matrix/test_block_converter.py
index ef4b28b1bc..afa9ec9963 100644
--- a/src/main/python/tests/matrix/test_block_converter.py
+++ b/src/main/python/tests/matrix/test_block_converter.py
@@ -26,6 +26,7 @@ import numpy as np
 from py4j.java_gateway import JVMView
 from systemds.context import SystemDSContext
 from systemds.utils.converters import matrix_block_to_numpy, 
numpy_to_matrix_block
+import scipy.sparse as sp
 
 
 class Test_MatrixBlockConverter(unittest.TestCase):
@@ -35,7 +36,9 @@ class Test_MatrixBlockConverter(unittest.TestCase):
 
     @classmethod
     def setUpClass(cls):
-        cls.sds = SystemDSContext(capture_stdout=True, logging_level=50)
+        cls.sds = SystemDSContext(
+            capture_stdout=True, logging_level=50, data_transfer_mode=0
+        )
 
     @classmethod
     def tearDownClass(cls):
@@ -70,10 +73,150 @@ class Test_MatrixBlockConverter(unittest.TestCase):
         array = np.array([rng.standard_normal(n) for x in range(k)])
         self.convert_back_and_forth(array)
 
+    def test_random_sparse_csr_nxn(self):
+        n = 10
+        array = sp.rand(n, n, density=0.1, format="csr")
+        self.convert_back_and_forth(array)
+
+    def test_sparse_csr_rectangular(self):
+        """Test CSR conversion with rectangular matrices"""
+        array = sp.rand(5, 10, density=0.2, format="csr")
+        self.convert_back_and_forth(array)
+
+    def test_sparse_csr_known_values(self):
+        """Test CSR conversion with a known sparse matrix"""
+        # Create a known CSR matrix
+        data = np.array([1.0, 2.0, 3.0, 4.0])
+        row = np.array([0, 0, 1, 2])
+        col = np.array([0, 2, 1, 2])
+        array = sp.csr_matrix((data, (row, col)), shape=(3, 3))
+        self.convert_back_and_forth(array)
+
+    def test_empty_dense_0x0(self):
+        """Test conversion of empty 0x0 dense matrix"""
+        array = np.array([]).reshape(0, 0)
+        self.convert_back_and_forth(array)
+
+    def test_empty_dense_0xn(self):
+        """Test conversion of empty matrix with zero rows"""
+        array = np.array([]).reshape(0, 5)
+        self.convert_back_and_forth(array)
+
+    def test_empty_dense_nx0(self):
+        """Test conversion of empty matrix with zero columns"""
+        array = np.array([]).reshape(5, 0)
+        self.convert_back_and_forth(array)
+
+    def test_sparse_csr_empty_rows(self):
+        """Test CSR conversion with rows that have no non-zero entries"""
+        # 4x3 matrix: row 0 has 2 entries, row 1 is empty, row 2 has 1 entry, 
row 3 is empty
+        data = np.array([1.0, 2.0, 3.0])
+        row = np.array([0, 0, 2])
+        col = np.array([0, 2, 1])
+        array = sp.csr_matrix((data, (row, col)), shape=(4, 3))
+        self.convert_back_and_forth(array)
+
+    def test_sparse_csr_first_and_last_row_empty(self):
+        """Test CSR with first and last rows empty"""
+        data = np.array([1.0, 2.0])
+        row = np.array([1, 1])
+        col = np.array([0, 1])
+        array = sp.csr_matrix((data, (row, col)), shape=(3, 2))
+        self.convert_back_and_forth(array)
+
+    def test_sparse_csr_all_zeros(self):
+        """Test CSR with no non-zero entries (empty structure)"""
+        array = sp.csr_matrix((3, 4))
+        self.convert_back_and_forth(array)
+
+    def test_sparse_coo_empty_rows(self):
+        """Test COO conversion with empty rows"""
+        data = np.array([1.0, 2.0])
+        row = np.array([0, 2])
+        col = np.array([1, 1])
+        array = sp.coo_matrix((data, (row, col)), shape=(4, 2))
+        self.convert_back_and_forth(array)
+
+    def test_dense_all_zeros(self):
+        """Test dense matrix with all zeros"""
+        array = np.zeros((4, 3))
+        self.convert_back_and_forth(array)
+
+    def test_dense_single_row(self):
+        """Test dense matrix with single row (1xn)"""
+        array = np.array([[1.0, 2.0, 3.0]])
+        self.convert_back_and_forth(array)
+
+    def test_dense_single_column(self):
+        """Test dense matrix with single column (nx1)"""
+        array = np.array([[1.0], [2.0], [3.0]])
+        self.convert_back_and_forth(array)
+
+    def test_random_sparse_coo_nxn(self):
+        n = 10
+        array = sp.rand(n, n, density=0.1, format="coo")
+        self.convert_back_and_forth(array)
+
+    def test_sparse_csr_empty_0x0(self):
+        """Test empty 0x0 CSR matrix"""
+        array = sp.csr_matrix((0, 0))
+        self.convert_back_and_forth(array)
+
+    def test_sparse_csr_empty_0xn(self):
+        """Test CSR with zero rows"""
+        array = sp.csr_matrix((0, 4))
+        self.convert_back_and_forth(array)
+
+    def test_sparse_csr_empty_nx0(self):
+        """Test CSR with zero columns"""
+        array = sp.csr_matrix((4, 0))
+        self.convert_back_and_forth(array)
+
+    def test_sparse_csr_single_element(self):
+        """Test 1x1 CSR with one non-zero"""
+        array = sp.csr_matrix(([3.14], ([0], [0])), shape=(1, 1))
+        self.convert_back_and_forth(array)
+
+    def test_sparse_csr_single_row(self):
+        """Test 1xn CSR (single row)"""
+        array = sp.csr_matrix(([1.0, 2.0], ([0, 0], [0, 2])), shape=(1, 4))
+        self.convert_back_and_forth(array)
+
+    def test_sparse_csr_single_column(self):
+        """Test nx1 CSR (single column)"""
+        array = sp.csr_matrix(([1.0, 2.0], ([0, 2], [0, 0])), shape=(4, 1))
+        self.convert_back_and_forth(array)
+
+    def test_sparse_csr_empty_columns(self):
+        """Test CSR where some columns have no non-zero entries"""
+        # 3x4 matrix: only columns 0 and 2 have entries
+        data = np.array([1.0, 2.0, 3.0])
+        row = np.array([0, 1, 2])
+        col = np.array([0, 2, 2])
+        array = sp.csr_matrix((data, (row, col)), shape=(3, 4))
+        self.convert_back_and_forth(array)
+
+    def test_sparse_coo_single_element(self):
+        """Test COO with single non-zero"""
+        array = sp.coo_matrix(([1.0], ([2], [3])), shape=(4, 5))
+        self.convert_back_and_forth(array)
+
+    def test_sparse_coo_all_zeros(self):
+        """Test COO with no non-zero entries"""
+        array = sp.coo_matrix((2, 3))
+        self.convert_back_and_forth(array)
+
+    def test_sparse_csc_rectangular(self):
+        """Test CSC conversion (fallback path: converted to dense in 
converter)"""
+        array = sp.csc_matrix(([1.0, 2.0, 3.0], ([0, 1, 2], [0, 0, 1])), 
shape=(3, 2))
+        self.convert_back_and_forth(array)
+
     def convert_back_and_forth(self, array):
         matrix_block = numpy_to_matrix_block(self.sds, array)
         # use the ability to call functions on matrix_block.
         returned = matrix_block_to_numpy(self.sds, matrix_block)
+        if isinstance(array, sp.spmatrix):
+            array = array.toarray()
         self.assertTrue(np.allclose(array, returned))
 
 
diff --git 
a/src/test/java/org/apache/sysds/test/component/frame/array/Py4jConverterUtilsTest.java
 
b/src/test/java/org/apache/sysds/test/component/frame/array/Py4jConverterUtilsTest.java
deleted file mode 100644
index 466c3337d8..0000000000
--- 
a/src/test/java/org/apache/sysds/test/component/frame/array/Py4jConverterUtilsTest.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysds.test.component.frame.array;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.charset.StandardCharsets;
-
-import org.apache.sysds.common.Types;
-import org.apache.sysds.common.Types.ValueType;
-import org.apache.sysds.runtime.util.Py4jConverterUtils;
-import org.apache.sysds.runtime.frame.data.columns.Array;
-import org.junit.Test;
-
-public class Py4jConverterUtilsTest {
-
-       @Test
-       public void testConvertUINT8() {
-               int numElements = 4;
-               byte[] data = {1, 2, 3, 4};
-               Array<?> result = Py4jConverterUtils.convert(data, numElements, 
Types.ValueType.UINT8);
-               assertNotNull(result);
-               assertEquals(4, result.size());
-               assertEquals(1, result.get(0));
-               assertEquals(2, result.get(1));
-               assertEquals(3, result.get(2));
-               assertEquals(4, result.get(3));
-       }
-
-       @Test
-       public void testConvertINT32() {
-               int numElements = 4;
-               ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * 
numElements);
-               buffer.order(ByteOrder.nativeOrder());
-               for(int i = 1; i <= numElements; i++) {
-                       buffer.putInt(i);
-               }
-               Array<?> result = Py4jConverterUtils.convert(buffer.array(), 
numElements, Types.ValueType.INT32);
-               assertNotNull(result);
-               assertEquals(4, result.size());
-               assertEquals(1, result.get(0));
-               assertEquals(2, result.get(1));
-               assertEquals(3, result.get(2));
-               assertEquals(4, result.get(3));
-       }
-
-       @Test
-       public void testConvertINT64() {
-               int numElements = 4;
-               ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES * 
numElements);
-               buffer.order(ByteOrder.nativeOrder());
-               for(int i = 1; i <= numElements; i++) {
-                       buffer.putLong((long) i);
-               }
-               Array<?> result = Py4jConverterUtils.convert(buffer.array(), 
numElements, Types.ValueType.INT64);
-               assertNotNull(result);
-               assertEquals(4, result.size());
-               assertEquals(1L, result.get(0));
-               assertEquals(2L, result.get(1));
-               assertEquals(3L, result.get(2));
-               assertEquals(4L, result.get(3));
-       }
-
-
-       @Test
-       public void testConvertHASH32() {
-               int numElements = 4;
-               ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * 
numElements);
-               buffer.order(ByteOrder.nativeOrder());
-               for(int i = 1; i <= numElements; i++) {
-                       buffer.putInt(i);
-               }
-               Array<?> result = Py4jConverterUtils.convert(buffer.array(), 
numElements, Types.ValueType.HASH32);
-               assertNotNull(result);
-               assertEquals(4, result.size());
-               assertEquals("1", result.get(0));
-               assertEquals("2", result.get(1));
-               assertEquals("3", result.get(2));
-               assertEquals("4", result.get(3));
-       }
-
-       @Test
-       public void testConvertHASH64() {
-               int numElements = 4;
-               ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES * 
numElements);
-               buffer.order(ByteOrder.nativeOrder());
-               for(int i = 1; i <= numElements; i++) {
-                       buffer.putLong((long) i);
-               }
-               Array<?> result = Py4jConverterUtils.convert(buffer.array(), 
numElements, Types.ValueType.HASH64);
-               assertNotNull(result);
-               assertEquals(4, result.size());
-               assertEquals("1", result.get(0));
-               assertEquals("2", result.get(1));
-               assertEquals("3", result.get(2));
-               assertEquals("4", result.get(3));
-       }
-
-       @Test
-       public void testConvertFP32() {
-               int numElements = 4;
-               ByteBuffer buffer = ByteBuffer.allocate(Float.BYTES * 
numElements);
-               buffer.order(ByteOrder.nativeOrder());
-               for(float i = 1.1f; i <= numElements + 1; i += 1.0) {
-                       buffer.putFloat(i);
-               }
-               Array<?> result = Py4jConverterUtils.convert(buffer.array(), 
numElements, Types.ValueType.FP32);
-               assertNotNull(result);
-               assertEquals(4, result.size());
-               assertEquals(1.1f, result.get(0));
-               assertEquals(2.1f, result.get(1));
-               assertEquals(3.1f, result.get(2));
-               assertEquals(4.1f, result.get(3));
-       }
-
-       @Test
-       public void testConvertFP64() {
-               int numElements = 4;
-               ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES * 
numElements);
-               buffer.order(ByteOrder.nativeOrder());
-               for(double i = 1.1; i <= numElements + 1; i += 1.0) {
-                       buffer.putDouble(i);
-               }
-               Array<?> result = Py4jConverterUtils.convert(buffer.array(), 
numElements, Types.ValueType.FP64);
-               assertNotNull(result);
-               assertEquals(4, result.size());
-               assertEquals(1.1, result.get(0));
-               assertEquals(2.1, result.get(1));
-               assertEquals(3.1, result.get(2));
-               assertEquals(4.1, result.get(3));
-       }
-
-       @Test
-       public void testConvertBoolean() {
-               int numElements = 4;
-               byte[] data = {1, 0, 1, 0};
-               Array<?> result = Py4jConverterUtils.convert(data, numElements, 
Types.ValueType.BOOLEAN);
-               assertNotNull(result);
-               assertEquals(4, result.size());
-               assertEquals(true, result.get(0));
-               assertEquals(false, result.get(1));
-               assertEquals(true, result.get(2));
-               assertEquals(false, result.get(3));
-       }
-
-       @Test
-       public void testConvertString() {
-               int numElements = 2;
-               String[] strings = {"hello", "world"};
-               ByteBuffer buffer = ByteBuffer.allocate(4 + strings[0].length() 
+ 4 + strings[1].length());
-               buffer.order(ByteOrder.LITTLE_ENDIAN);
-               for(String s : strings) {
-                       buffer.putInt(s.length());
-                       buffer.put(s.getBytes(StandardCharsets.UTF_8));
-               }
-               Array<?> result = Py4jConverterUtils.convert(buffer.array(), 
numElements, Types.ValueType.STRING);
-               assertNotNull(result);
-               assertEquals(2, result.size());
-               assertEquals("hello", result.get(0));
-               assertEquals("world", result.get(1));
-       }
-
-       @Test
-       public void testConvertChar() {
-               char[] c = {'h', 'e', 'l', 'l', 'o', ' ', 'w', 'o', 'r', 'l', 
'd'};
-               ByteBuffer buffer = ByteBuffer.allocate(Character.BYTES * 
c.length);
-               buffer.order(ByteOrder.LITTLE_ENDIAN);
-               for(char s : c) {
-                       buffer.putChar(s);
-               }
-               Array<?> result = Py4jConverterUtils.convert(buffer.array(), 
c.length, Types.ValueType.CHARACTER);
-               assertNotNull(result);
-               assertEquals(c.length, result.size());
-
-               for(int i = 0; i < c.length; i++) {
-                       assertEquals(c[i], result.get(i));
-               }
-       }
-
-       @Test
-       public void testConvertRow() {
-               int numElements = 4;
-               byte[] data = {1, 2, 3, 4};
-               Object[] row = Py4jConverterUtils.convertRow(data, numElements, 
Types.ValueType.UINT8);
-               assertNotNull(row);
-               assertEquals(4, row.length);
-               assertEquals(1, row[0]);
-               assertEquals(2, row[1]);
-               assertEquals(3, row[2]);
-               assertEquals(4, row[3]);
-       }
-
-       @Test
-       public void testConvertFused() {
-               int numElements = 1;
-               byte[] data = {1, 2, 3, 4};
-               Types.ValueType[] valueTypes = {ValueType.UINT8, 
ValueType.UINT8, ValueType.UINT8, ValueType.UINT8};
-               Array<?>[] arrays = Py4jConverterUtils.convertFused(data, 
numElements, valueTypes);
-               assertNotNull(arrays);
-               assertEquals(4, arrays.length);
-               for(int i = 0; i < 4; i++) {
-                       assertEquals(1 + i, arrays[i].get(0));
-               }
-       }
-
-       @Test(expected = Exception.class)
-       public void nullData() {
-               Py4jConverterUtils.convert(null, 14, ValueType.BOOLEAN);
-       }
-
-       @Test(expected = Exception.class)
-       public void nullValueType() {
-               Py4jConverterUtils.convert(new byte[] {1, 2, 3}, 14, null);
-       }
-
-       @Test(expected = Exception.class)
-       public void unknownValueType() {
-               Py4jConverterUtils.convert(new byte[] {1, 2, 3}, 14, 
ValueType.UNKNOWN);
-       }
-}
diff --git 
a/src/test/java/org/apache/sysds/test/component/utils/Py4jConverterUtilsTest.java
 
b/src/test/java/org/apache/sysds/test/component/utils/Py4jConverterUtilsTest.java
new file mode 100644
index 0000000000..2399f695a7
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/component/utils/Py4jConverterUtilsTest.java
@@ -0,0 +1,510 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.sysds.common.Types;
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.frame.data.columns.Array;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.util.Py4jConverterUtils;
+import org.junit.Test;
+
+public class Py4jConverterUtilsTest {
+
+       @Test
+       public void testConvertUINT8() {
+               int numElements = 4;
+               byte[] data = {1, 2, 3, 4};
+               Array<?> result = Py4jConverterUtils.convert(data, numElements, 
Types.ValueType.UINT8);
+               assertNotNull(result);
+               assertEquals(4, result.size());
+               assertEquals(1, result.get(0));
+               assertEquals(2, result.get(1));
+               assertEquals(3, result.get(2));
+               assertEquals(4, result.get(3));
+       }
+
+       @Test
+       public void testConvertINT32() {
+               int numElements = 4;
+               ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * 
numElements);
+               buffer.order(ByteOrder.nativeOrder());
+               for(int i = 1; i <= numElements; i++) {
+                       buffer.putInt(i);
+               }
+               Array<?> result = Py4jConverterUtils.convert(buffer.array(), 
numElements, Types.ValueType.INT32);
+               assertNotNull(result);
+               assertEquals(4, result.size());
+               assertEquals(1, result.get(0));
+               assertEquals(2, result.get(1));
+               assertEquals(3, result.get(2));
+               assertEquals(4, result.get(3));
+       }
+
+       @Test
+       public void testConvertINT64() {
+               int numElements = 4;
+               ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES * 
numElements);
+               buffer.order(ByteOrder.nativeOrder());
+               for(int i = 1; i <= numElements; i++) {
+                       buffer.putLong((long) i);
+               }
+               Array<?> result = Py4jConverterUtils.convert(buffer.array(), 
numElements, Types.ValueType.INT64);
+               assertNotNull(result);
+               assertEquals(4, result.size());
+               assertEquals(1L, result.get(0));
+               assertEquals(2L, result.get(1));
+               assertEquals(3L, result.get(2));
+               assertEquals(4L, result.get(3));
+       }
+
+
+       @Test
+       public void testConvertHASH32() {
+               int numElements = 4;
+               ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * 
numElements);
+               buffer.order(ByteOrder.nativeOrder());
+               for(int i = 1; i <= numElements; i++) {
+                       buffer.putInt(i);
+               }
+               Array<?> result = Py4jConverterUtils.convert(buffer.array(), 
numElements, Types.ValueType.HASH32);
+               assertNotNull(result);
+               assertEquals(4, result.size());
+               assertEquals("1", result.get(0));
+               assertEquals("2", result.get(1));
+               assertEquals("3", result.get(2));
+               assertEquals("4", result.get(3));
+       }
+
+       @Test
+       public void testConvertHASH64() {
+               int numElements = 4;
+               ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES * 
numElements);
+               buffer.order(ByteOrder.nativeOrder());
+               for(int i = 1; i <= numElements; i++) {
+                       buffer.putLong((long) i);
+               }
+               Array<?> result = Py4jConverterUtils.convert(buffer.array(), 
numElements, Types.ValueType.HASH64);
+               assertNotNull(result);
+               assertEquals(4, result.size());
+               assertEquals("1", result.get(0));
+               assertEquals("2", result.get(1));
+               assertEquals("3", result.get(2));
+               assertEquals("4", result.get(3));
+       }
+
+       @Test
+       public void testConvertFP32() {
+               int numElements = 4;
+               ByteBuffer buffer = ByteBuffer.allocate(Float.BYTES * 
numElements);
+               buffer.order(ByteOrder.nativeOrder());
+               for(float i = 1.1f; i <= numElements + 1; i += 1.0) {
+                       buffer.putFloat(i);
+               }
+               Array<?> result = Py4jConverterUtils.convert(buffer.array(), 
numElements, Types.ValueType.FP32);
+               assertNotNull(result);
+               assertEquals(4, result.size());
+               assertEquals(1.1f, result.get(0));
+               assertEquals(2.1f, result.get(1));
+               assertEquals(3.1f, result.get(2));
+               assertEquals(4.1f, result.get(3));
+       }
+
+       @Test
+       public void testConvertFP64() {
+               int numElements = 4;
+               ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES * 
numElements);
+               buffer.order(ByteOrder.nativeOrder());
+               for(double i = 1.1; i <= numElements + 1; i += 1.0) {
+                       buffer.putDouble(i);
+               }
+               Array<?> result = Py4jConverterUtils.convert(buffer.array(), 
numElements, Types.ValueType.FP64);
+               assertNotNull(result);
+               assertEquals(4, result.size());
+               assertEquals(1.1, result.get(0));
+               assertEquals(2.1, result.get(1));
+               assertEquals(3.1, result.get(2));
+               assertEquals(4.1, result.get(3));
+       }
+
+       @Test
+       public void testConvertBoolean() {
+               int numElements = 4;
+               byte[] data = {1, 0, 1, 0};
+               Array<?> result = Py4jConverterUtils.convert(data, numElements, 
Types.ValueType.BOOLEAN);
+               assertNotNull(result);
+               assertEquals(4, result.size());
+               assertEquals(true, result.get(0));
+               assertEquals(false, result.get(1));
+               assertEquals(true, result.get(2));
+               assertEquals(false, result.get(3));
+       }
+
+       @Test
+       public void testConvertString() {
+               int numElements = 2;
+               String[] strings = {"hello", "world"};
+               ByteBuffer buffer = ByteBuffer.allocate(4 + strings[0].length() 
+ 4 + strings[1].length());
+               buffer.order(ByteOrder.LITTLE_ENDIAN);
+               for(String s : strings) {
+                       buffer.putInt(s.length());
+                       buffer.put(s.getBytes(StandardCharsets.UTF_8));
+               }
+               Array<?> result = Py4jConverterUtils.convert(buffer.array(), 
numElements, Types.ValueType.STRING);
+               assertNotNull(result);
+               assertEquals(2, result.size());
+               assertEquals("hello", result.get(0));
+               assertEquals("world", result.get(1));
+       }
+
+       @Test
+       public void testConvertChar() {
+               char[] c = {'h', 'e', 'l', 'l', 'o', ' ', 'w', 'o', 'r', 'l', 
'd'};
+               ByteBuffer buffer = ByteBuffer.allocate(Character.BYTES * 
c.length);
+               buffer.order(ByteOrder.LITTLE_ENDIAN);
+               for(char s : c) {
+                       buffer.putChar(s);
+               }
+               Array<?> result = Py4jConverterUtils.convert(buffer.array(), 
c.length, Types.ValueType.CHARACTER);
+               assertNotNull(result);
+               assertEquals(c.length, result.size());
+
+               for(int i = 0; i < c.length; i++) {
+                       assertEquals(c[i], result.get(i));
+               }
+       }
+
+       @Test
+       public void testConvertRow() {
+               int numElements = 4;
+               byte[] data = {1, 2, 3, 4};
+               Object[] row = Py4jConverterUtils.convertRow(data, numElements, 
Types.ValueType.UINT8);
+               assertNotNull(row);
+               assertEquals(4, row.length);
+               assertEquals(1, row[0]);
+               assertEquals(2, row[1]);
+               assertEquals(3, row[2]);
+               assertEquals(4, row[3]);
+       }
+
+       @Test
+       public void testConvertFused() {
+               int numElements = 1;
+               byte[] data = {1, 2, 3, 4};
+               Types.ValueType[] valueTypes = {ValueType.UINT8, 
ValueType.UINT8, ValueType.UINT8, ValueType.UINT8};
+               Array<?>[] arrays = Py4jConverterUtils.convertFused(data, 
numElements, valueTypes);
+               assertNotNull(arrays);
+               assertEquals(4, arrays.length);
+               for(int i = 0; i < 4; i++) {
+                       assertEquals(1 + i, arrays[i].get(0));
+               }
+       }
+
+       @Test(expected = Exception.class)
+       public void nullData() {
+               Py4jConverterUtils.convert(null, 14, ValueType.BOOLEAN);
+       }
+
+       @Test(expected = Exception.class)
+       public void nullValueType() {
+               Py4jConverterUtils.convert(new byte[] {1, 2, 3}, 14, null);
+       }
+
+       @Test(expected = Exception.class)
+       public void unknownValueType() {
+               Py4jConverterUtils.convert(new byte[] {1, 2, 3}, 14, 
ValueType.UNKNOWN);
+       }
+
+       @Test
+       public void testConvertPy4JArrayToMBFP64() {
+               int rlen = 2;
+               int clen = 3;
+               ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES * rlen * 
clen);
+               buffer.order(ByteOrder.nativeOrder());
+               double[] values = {1.1, 2.2, 3.3, 4.4, 5.5, 6.6};
+               for(double val : values) {
+                       buffer.putDouble(val);
+               }
+               MatrixBlock mb = 
Py4jConverterUtils.convertPy4JArrayToMB(buffer.array(), rlen, clen);
+               assertNotNull(mb);
+               assertEquals(rlen, mb.getNumRows());
+               assertEquals(clen, mb.getNumColumns());
+               assertEquals(1.1, mb.get(0, 0), 0.0001);
+               assertEquals(2.2, mb.get(0, 1), 0.0001);
+               assertEquals(3.3, mb.get(0, 2), 0.0001);
+               assertEquals(4.4, mb.get(1, 0), 0.0001);
+               assertEquals(5.5, mb.get(1, 1), 0.0001);
+               assertEquals(6.6, mb.get(1, 2), 0.0001);
+       }
+
+       @Test
+       public void testConvertPy4JArrayToMBUINT8() {
+               int rlen = 2;
+               int clen = 2;
+               byte[] data = {(byte) 1, (byte) 2, (byte) 3, (byte) 4};
+               MatrixBlock mb = Py4jConverterUtils.convertPy4JArrayToMB(data, 
rlen, clen, ValueType.UINT8);
+               assertNotNull(mb);
+               assertEquals(rlen, mb.getNumRows());
+               assertEquals(clen, mb.getNumColumns());
+               assertEquals(1.0, mb.get(0, 0), 0.0001);
+               assertEquals(2.0, mb.get(0, 1), 0.0001);
+               assertEquals(3.0, mb.get(1, 0), 0.0001);
+               assertEquals(4.0, mb.get(1, 1), 0.0001);
+       }
+
+       @Test
+       public void testConvertPy4JArrayToMBINT32() {
+               int rlen = 2;
+               int clen = 2;
+               ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * rlen * 
clen);
+               buffer.order(ByteOrder.nativeOrder());
+               int[] values = {10, 20, 30, 40};
+               for(int val : values) {
+                       buffer.putInt(val);
+               }
+               MatrixBlock mb = 
Py4jConverterUtils.convertPy4JArrayToMB(buffer.array(), rlen, clen, 
ValueType.INT32);
+               assertNotNull(mb);
+               assertEquals(rlen, mb.getNumRows());
+               assertEquals(clen, mb.getNumColumns());
+               assertEquals(10.0, mb.get(0, 0), 0.0001);
+               assertEquals(20.0, mb.get(0, 1), 0.0001);
+               assertEquals(30.0, mb.get(1, 0), 0.0001);
+               assertEquals(40.0, mb.get(1, 1), 0.0001);
+       }
+
+       @Test
+       public void testConvertPy4JArrayToMBFP32() {
+               int rlen = 2;
+               int clen = 2;
+               ByteBuffer buffer = ByteBuffer.allocate(Float.BYTES * rlen * 
clen);
+               buffer.order(ByteOrder.nativeOrder());
+               float[] values = {1.5f, 2.5f, 3.5f, 4.5f};
+               for(float val : values) {
+                       buffer.putFloat(val);
+               }
+               MatrixBlock mb = 
Py4jConverterUtils.convertPy4JArrayToMB(buffer.array(), rlen, clen, 
ValueType.FP32);
+               assertNotNull(mb);
+               assertEquals(rlen, mb.getNumRows());
+               assertEquals(clen, mb.getNumColumns());
+               assertEquals(1.5f, mb.get(0, 0), 0.0001);
+               assertEquals(2.5f, mb.get(0, 1), 0.0001);
+               assertEquals(3.5f, mb.get(1, 0), 0.0001);
+               assertEquals(4.5f, mb.get(1, 1), 0.0001);
+       }
+
+       @Test(expected = DMLRuntimeException.class)
+       public void testConvertPy4JArrayToMBSparseNotSupported() {
+               int rlen = 2;
+               int clen = 2;
+               byte[] data = {1, 2, 3, 4};
+               Py4jConverterUtils.convertPy4JArrayToMB(data, rlen, clen, true, 
ValueType.UINT8);
+       }
+
+       @Test
+       public void testConvertSciPyCOOToMB() {
+               int rlen = 10;
+               int clen = 10;
+               int nnz = 3;
+               // Create COO format: values at (0,0)=1.0, (1,2)=2.0, (2,1)=3.0
+               ByteBuffer dataBuf = ByteBuffer.allocate(Double.BYTES * nnz);
+               dataBuf.order(ByteOrder.nativeOrder());
+               dataBuf.putDouble(1.0);
+               dataBuf.putDouble(2.0);
+               dataBuf.putDouble(3.0);
+               
+               ByteBuffer rowBuf = ByteBuffer.allocate(Integer.BYTES * nnz);
+               rowBuf.order(ByteOrder.nativeOrder());
+               rowBuf.putInt(0);
+               rowBuf.putInt(1);
+               rowBuf.putInt(2);
+               
+               ByteBuffer colBuf = ByteBuffer.allocate(Integer.BYTES * nnz);
+               colBuf.order(ByteOrder.nativeOrder());
+               colBuf.putInt(0);
+               colBuf.putInt(2);
+               colBuf.putInt(1);
+               
+               MatrixBlock mb = Py4jConverterUtils.convertSciPyCOOToMB(
+                       dataBuf.array(), rowBuf.array(), colBuf.array(), rlen, 
clen, nnz);
+               assertNotNull(mb);
+               assertEquals(rlen, mb.getNumRows());
+               assertEquals(clen, mb.getNumColumns());
+               assertTrue(mb.isInSparseFormat());
+               assertEquals(1.0, mb.get(0, 0), 0.0001);
+               assertEquals(2.0, mb.get(1, 2), 0.0001);
+               assertEquals(3.0, mb.get(2, 1), 0.0001);
+               assertEquals(0.0, mb.get(0, 1), 0.0001);
+               assertEquals(0.0, mb.get(1, 0), 0.0001);
+       }
+
+       @Test
+       public void testConvertSciPyCSRToMB() {
+               int rlen = 10;
+               int clen = 10;
+               int nnz = 3;
+               // Create CSR format: values at (0,0)=1.0, (1,2)=2.0, (2,1)=3.0
+               ByteBuffer dataBuf = ByteBuffer.allocate(Double.BYTES * nnz);
+               dataBuf.order(ByteOrder.nativeOrder());
+               dataBuf.putDouble(1.0);
+               dataBuf.putDouble(2.0);
+               dataBuf.putDouble(3.0);
+               
+               ByteBuffer indicesBuf = ByteBuffer.allocate(Integer.BYTES * 
nnz);
+               indicesBuf.order(ByteOrder.nativeOrder());
+               indicesBuf.putInt(0);  // column for row 0
+               indicesBuf.putInt(2);  // column for row 1
+               indicesBuf.putInt(1);  // column for row 2
+               
+               ByteBuffer indptrBuf = ByteBuffer.allocate(Integer.BYTES * 
(rlen + 1));
+               indptrBuf.order(ByteOrder.nativeOrder());
+               indptrBuf.putInt(0);  // row 0 starts at index 0
+               indptrBuf.putInt(1);  // row 1 starts at index 1
+               indptrBuf.putInt(2);  // row 2 starts at index 2
+               indptrBuf.putInt(3);  // end marker
+               
+               MatrixBlock mb = Py4jConverterUtils.convertSciPyCSRToMB(
+                       dataBuf.array(), indicesBuf.array(), indptrBuf.array(), 
rlen, clen, nnz);
+               assertNotNull(mb);
+               assertEquals(rlen, mb.getNumRows());
+               assertEquals(clen, mb.getNumColumns());
+               assertTrue(mb.isInSparseFormat());
+               assertEquals(1.0, mb.get(0, 0), 0.0001);
+               assertEquals(2.0, mb.get(1, 2), 0.0001);
+               assertEquals(3.0, mb.get(2, 1), 0.0001);
+               assertEquals(0.0, mb.get(0, 1), 0.0001);
+               assertEquals(0.0, mb.get(1, 0), 0.0001);
+       }
+
+       @Test
+       public void testAllocateDenseOrSparseDense() {
+               int rlen = 5;
+               int clen = 5;
+               MatrixBlock mb = Py4jConverterUtils.allocateDenseOrSparse(rlen, 
clen, false);
+               assertNotNull(mb);
+               assertEquals(rlen, mb.getNumRows());
+               assertEquals(clen, mb.getNumColumns());
+               assertTrue(!mb.isInSparseFormat());
+       }
+
+       @Test
+       public void testAllocateDenseOrSparseSparse() {
+               int rlen = 5;
+               int clen = 5;
+               MatrixBlock mb = Py4jConverterUtils.allocateDenseOrSparse(rlen, 
clen, true);
+               assertNotNull(mb);
+               assertEquals(rlen, mb.getNumRows());
+               assertEquals(clen, mb.getNumColumns());
+               assertTrue(mb.isInSparseFormat());
+       }
+
+       @Test
+       public void testAllocateDenseOrSparseLong() {
+               long rlen = 10L;
+               long clen = 10L;
+               MatrixBlock mb = Py4jConverterUtils.allocateDenseOrSparse(rlen, 
clen, false);
+               assertNotNull(mb);
+               assertEquals((int) rlen, mb.getNumRows());
+               assertEquals((int) clen, mb.getNumColumns());
+       }
+
+       @Test(expected = DMLRuntimeException.class)
+       public void testAllocateDenseOrSparseLongTooLarge() {
+               long rlen = Integer.MAX_VALUE + 1L;
+               long clen = 10L;
+               Py4jConverterUtils.allocateDenseOrSparse(rlen, clen, false);
+       }
+
+       @Test
+       public void testConvertMBtoPy4JDenseArr() {
+               int rlen = 2;
+               int clen = 2;
+               MatrixBlock mb = new MatrixBlock(rlen, clen, false);
+               mb.allocateBlock();
+               mb.set(0, 0, 1.0);
+               mb.set(0, 1, 2.0);
+               mb.set(1, 0, 3.0);
+               mb.set(1, 1, 4.0);
+               
+               byte[] result = Py4jConverterUtils.convertMBtoPy4JDenseArr(mb);
+               assertNotNull(result);
+               assertEquals(Double.BYTES * rlen * clen, result.length);
+               
+               ByteBuffer buffer = ByteBuffer.wrap(result);
+               buffer.order(ByteOrder.nativeOrder());
+               assertEquals(1.0, buffer.getDouble(), 0.0001);
+               assertEquals(2.0, buffer.getDouble(), 0.0001);
+               assertEquals(3.0, buffer.getDouble(), 0.0001);
+               assertEquals(4.0, buffer.getDouble(), 0.0001);
+       }
+
+       @Test
+       public void testConvertMBtoPy4JDenseArrRoundTrip() {
+               int rlen = 2;
+               int clen = 3;
+               ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES * rlen * 
clen);
+               buffer.order(ByteOrder.nativeOrder());
+               double[] values = {1.1, 2.2, 3.3, 4.4, 5.5, 6.6};
+               for(double val : values) {
+                       buffer.putDouble(val);
+               }
+               
+               MatrixBlock mb = 
Py4jConverterUtils.convertPy4JArrayToMB(buffer.array(), rlen, clen);
+               byte[] result = Py4jConverterUtils.convertMBtoPy4JDenseArr(mb);
+               
+               ByteBuffer resultBuffer = ByteBuffer.wrap(result);
+               resultBuffer.order(ByteOrder.nativeOrder());
+               for(double expected : values) {
+                       assertEquals(expected, resultBuffer.getDouble(), 
0.0001);
+               }
+       }
+
+       @Test
+       public void testConvertMBtoPy4JDenseArrSparseToDense() {
+               new Py4jConverterUtils();
+               int rlen = 3;
+               int clen = 3;
+               MatrixBlock mb = new MatrixBlock(rlen, clen, true);
+               mb.allocateSparseRowsBlock(false);
+               mb.set(0, 0, 1.0);
+               mb.set(2, 2, 2.0);
+               
+               byte[] result = Py4jConverterUtils.convertMBtoPy4JDenseArr(mb);
+               assertNotNull(result);
+               assertEquals(Double.BYTES * rlen * clen, result.length);
+               
+               ByteBuffer buffer = ByteBuffer.wrap(result);
+               buffer.order(ByteOrder.nativeOrder());
+               assertEquals(1.0, buffer.getDouble(), 0.0001);
+               // Skip to position (2,2) = index 8
+               for(int i = 1; i < 8; i++) {
+                       assertEquals(0.0, buffer.getDouble(), 0.0001);
+               }
+               assertEquals(2.0, buffer.getDouble(), 0.0001);
+       }
+}

Reply via email to