This is an automated email from the ASF dual-hosted git repository.
estrauss pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 3f7b17badb [SYSTEMDS-3902] Sparse data transfer: Python --> Java
3f7b17badb is described below
commit 3f7b17badbf81ace6e811c2194a8817456ec4de9
Author: e-strauss <[email protected]>
AuthorDate: Thu Feb 12 15:47:43 2026 +0100
[SYSTEMDS-3902] Sparse data transfer: Python --> Java
This commit implements optimized data transfer for Scipy sparse matrices
from Python to the Java runtime. Key changes include the addition of
`convertSciPyCSRToMB` and `convertSciPyCOOToMB` in the Java utility layer to
directly handle compressed sparse row and coordinate formats. On the Python
side, the `SystemDSContext` now supports a `sparse_data_transfer` flag and a
new `from_py` method to unify data ingestion. These updates allow sparse data
to be transferred without being convert [...]
Closes #2379.
---
.../sysds/runtime/util/Py4jConverterUtils.java | 42 ++
.../python/systemds/context/systemds_context.py | 79 +++-
src/main/python/systemds/utils/converters.py | 327 +++++++------
.../python/tests/matrix/test_block_converter.py | 145 +++++-
.../frame/array/Py4jConverterUtilsTest.java | 240 ----------
.../component/utils/Py4jConverterUtilsTest.java | 510 +++++++++++++++++++++
6 files changed, 952 insertions(+), 391 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java
b/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java
index 7faee722d0..f705a3c9c0 100644
--- a/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
+import org.apache.log4j.Logger;
import org.apache.sysds.common.Types;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.frame.data.columns.Array;
@@ -35,6 +36,7 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock;
* Utils for converting python data to java.
*/
public class Py4jConverterUtils {
+ private static final Logger LOG =
Logger.getLogger(Py4jConverterUtils.class);
public static MatrixBlock convertPy4JArrayToMB(byte[] data, int rlen,
int clen) {
return convertPy4JArrayToMB(data, rlen, clen, false,
Types.ValueType.FP64);
}
@@ -63,6 +65,45 @@ public class Py4jConverterUtils {
return mb;
}
+ public static MatrixBlock convertSciPyCSRToMB(byte[] data, byte[]
indices, byte[] indptr, int rlen, int clen, int nnz) {
+ LOG.debug("Converting compressed sparse row matrix to
MatrixBlock");
+ MatrixBlock mb = new MatrixBlock(rlen, clen, true);
+ mb.allocateSparseRowsBlock(false);
+ ByteBuffer dataBuf = ByteBuffer.wrap(data);
+ dataBuf.order(ByteOrder.nativeOrder());
+ ByteBuffer indicesBuf = ByteBuffer.wrap(indices);
+ indicesBuf.order(ByteOrder.nativeOrder());
+ ByteBuffer indptrBuf = ByteBuffer.wrap(indptr);
+ indptrBuf.order(ByteOrder.nativeOrder());
+
+ // Read indptr array to get row boundaries
+ int[] rowPtrs = new int[rlen + 1];
+ for(int i = 0; i <= rlen; i++) {
+ rowPtrs[i] = indptrBuf.getInt();
+ }
+
+ // Iterate through each row
+ for(int row = 0; row < rlen; row++) {
+ int startIdx = rowPtrs[row];
+ int endIdx = rowPtrs[row + 1];
+
+ // Set buffer positions to the start of this row
+ dataBuf.position(startIdx * Double.BYTES);
+ indicesBuf.position(startIdx * Integer.BYTES);
+
+ // Process all non-zeros in this row sequentially
+ for(int idx = startIdx; idx < endIdx; idx++) {
+ double val = dataBuf.getDouble();
+ int colIndex = indicesBuf.getInt();
+ mb.set(row, colIndex, val);
+ }
+ }
+
+ mb.recomputeNonZeros();
+ mb.examSparsity();
+ return mb;
+ }
+
public static MatrixBlock allocateDenseOrSparse(int rlen, int clen,
boolean isSparse) {
MatrixBlock ret = new MatrixBlock(rlen, clen, isSparse);
ret.allocateBlock();
@@ -208,6 +249,7 @@ public class Py4jConverterUtils {
public static byte[] convertMBtoPy4JDenseArr(MatrixBlock mb) {
byte[] ret = null;
if(mb.isInSparseFormat()) {
+ LOG.debug("Converting sparse matrix to dense");
mb.sparseToDense();
}
diff --git a/src/main/python/systemds/context/systemds_context.py
b/src/main/python/systemds/context/systemds_context.py
index 99a6cba57b..7268e5b86a 100644
--- a/src/main/python/systemds/context/systemds_context.py
+++ b/src/main/python/systemds/context/systemds_context.py
@@ -29,17 +29,19 @@ import socket
import sys
import struct
import traceback
+import warnings
from contextlib import contextmanager
from glob import glob
from queue import Queue
from subprocess import PIPE, Popen
from threading import Thread
-from time import sleep, time
+from time import sleep
from typing import Dict, Iterable, Sequence, Tuple, Union
from concurrent.futures import ThreadPoolExecutor
import numpy as np
import pandas as pd
+import scipy.sparse as sp
from py4j.java_gateway import GatewayParameters, JavaGateway, Py4JNetworkError
from systemds.operator import (
Frame,
@@ -77,6 +79,7 @@ class SystemDSContext(object):
_FIFO_JAVA2PY_PIPES = []
_data_transfer_mode = 0
_multi_pipe_enabled = False
+ _sparse_data_transfer = True
_logging_initialized = False
_executor_pool = ThreadPoolExecutor(max_workers=os.cpu_count() * 2 or 4)
@@ -89,6 +92,7 @@ class SystemDSContext(object):
py4j_logging_level: int = 50,
data_transfer_mode: int = 1,
multi_pipe_enabled: bool = False,
+ sparse_data_transfer: bool = True,
):
"""Starts a new instance of SystemDSContext, in which the connection
to a JVM systemds instance is handled
Any new instance of this SystemDS Context, would start a separate new
JVM.
@@ -103,14 +107,26 @@ class SystemDSContext(object):
The logging levels are as follows: 10 DEBUG, 20 INFO, 30 WARNING,
40 ERROR, 50 CRITICAL.
:param py4j_logging_level: The logging level for Py4j to use, since
all communication to the JVM is done through this,
it can be verbose if not set high.
- :param data_transfer_mode: default 0,
+ :param data_transfer_mode: default 0, 0 for py4j, 1 for using pipes
(on unix systems)
+ :param multi_pipe_enabled: default False, if True, use multiple pipes
for data transfer
+ only used if data_transfer_mode is 1.
+ .. experimental:: This parameter is experimental and may be
removed in a future version.
+ :param sparse_data_transfer: default True, if True, use optimized
sparse matrix transfer,
+ if False, convert sparse matrices to dense arrays before transfer
"""
+ if multi_pipe_enabled:
+ warnings.warn(
+ "The 'multi_pipe_enabled' parameter is experimental and may be
removed in a future version.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
self.__setup_logging(logging_level, py4j_logging_level)
self.__start(port, capture_stdout)
self.capture_stats(capture_statistics)
self._log.debug("Started JVM and SystemDS python context manager")
self.__setup_data_transfer(data_transfer_mode, multi_pipe_enabled)
+ self._sparse_data_transfer = sparse_data_transfer
def __setup_data_transfer(self, data_transfer_mode=0,
multi_pipe_enabled=False):
self._data_transfer_mode = data_transfer_mode
@@ -769,21 +785,65 @@ class SystemDSContext(object):
# therefore the output type is assign.
return Scalar(self, v, assign=True)
+ def from_py(
+ self,
+ src: Union[np.ndarray, sp.spmatrix, pd.DataFrame, pd.Series],
+ *args: Sequence[VALID_INPUT_TYPES],
+ **kwargs: Dict[str, VALID_INPUT_TYPES],
+ ) -> Union[Matrix, Frame]:
+ """Generate DAGNode representing data given by a python object, which
will be sent to SystemDS on need.
+ :param src: the python object
+ :param args: unnamed parameters
+ :param kwargs: named parameters
+ :return: A Matrix or Frame Node
+ """
+
+ def get_params(src, args, kwargs):
+ unnamed_params = ["'./tmp/{file_name}'"]
+ if len(src.shape) == 2:
+ named_params = {"rows": src.shape[0], "cols": src.shape[1]}
+ elif len(src.shape) == 1:
+ named_params = {"rows": src.shape[0], "cols": 1}
+ else:
+ # TODO Support tensors.
+ raise ValueError("Only two dimensional arrays supported")
+ unnamed_params.extend(args)
+ named_params.update(kwargs)
+ return unnamed_params, named_params
+
+ if isinstance(src, np.ndarray):
+ unnamed_params, named_params = get_params(src, args, kwargs)
+ return Matrix(self, "read", unnamed_params, named_params,
local_data=src)
+ elif isinstance(src, sp.spmatrix):
+ unnamed_params, named_params = get_params(src, args, kwargs)
+ return Matrix(self, "read", unnamed_params, named_params,
local_data=src)
+ elif isinstance(src, pd.DataFrame):
+ unnamed_params, named_params = get_params(src, args, kwargs)
+ named_params["data_type"] = '"frame"'
+ return Frame(self, "read", unnamed_params, named_params,
local_data=src)
+ elif isinstance(src, pd.Series):
+ unnamed_params, named_params = get_params(src, args, kwargs)
+ named_params["data_type"] = '"frame"'
+ return Frame(self, "read", unnamed_params, named_params,
local_data=src)
+ else:
+ raise ValueError(f"Unsupported data type: {type(src)}")
+
def from_numpy(
self,
- mat: np.array,
+ mat: Union[np.ndarray, sp.spmatrix],
*args: Sequence[VALID_INPUT_TYPES],
**kwargs: Dict[str, VALID_INPUT_TYPES],
) -> Matrix:
- """Generate DAGNode representing matrix with data given by a numpy
array, which will be sent to SystemDS
- on need.
+ """Generate DAGNode representing matrix with data given by a numpy
array or scipy sparse matrix,
+ which will be sent to SystemDS on need.
- :param mat: the numpy array
+ :param mat: the numpy array or scipy sparse matrix
:param args: unnamed parameters
:param kwargs: named parameters
:return: A Matrix
+ Note: This method is deprecated. Use from_py instead.
"""
-
+ self._log.warning(f"Deprecated method from_numpy. Use from_py
instead.")
unnamed_params = ["'./tmp/{file_name}'"]
if len(mat.shape) == 2:
@@ -811,7 +871,9 @@ class SystemDSContext(object):
:param args: unnamed parameters
:param kwargs: named parameters
:return: A Frame
+ Note: This method is deprecated. Use from_py instead.
"""
+ self._log.warning(f"Deprecated method from_pandas. Use from_py
instead.")
unnamed_params = ["'./tmp/{file_name}'"]
if len(df.shape) == 2:
@@ -824,9 +886,6 @@ class SystemDSContext(object):
unnamed_params.extend(args)
named_params["data_type"] = '"frame"'
-
- self._pd_dataframe = df
-
named_params.update(kwargs)
return Frame(self, "read", unnamed_params, named_params, local_data=df)
diff --git a/src/main/python/systemds/utils/converters.py
b/src/main/python/systemds/utils/converters.py
index 5f4619a8bb..7cabf2aabd 100644
--- a/src/main/python/systemds/utils/converters.py
+++ b/src/main/python/systemds/utils/converters.py
@@ -21,11 +21,13 @@
import struct
from time import time
+from typing import Union
import numpy as np
import pandas as pd
import concurrent.futures
from py4j.java_gateway import JavaClass, JavaGateway, JavaObject, JVMView
import os
+import scipy.sparse as sp
# Constants
_HANDSHAKE_OFFSET = 1000
@@ -86,129 +88,6 @@ def _pipe_receive_bytes(pipe, view, offset, end,
batch_size_bytes, logger):
offset += actual_size
-def _pipe_receive_strings(
- pipe, num_strings, batch_size=_DEFAULT_BATCH_SIZE_BYTES, pipe_id=0,
logger=None
-):
- """
- Reads UTF-8 encoded strings from the pipe in batches.
- Format: <I (little-endian int32) length prefix, followed by UTF-8 bytes.
-
- Returns: tuple of (strings_list, total_time, decode_time, io_time,
num_strings)
- """
- t_total_start = time()
- t_decode = 0.0
- t_io = 0.0
-
- strings = []
- fd = pipe.fileno() # Cache file descriptor
-
- # Use a reusable buffer to avoid repeated allocations
- buf = bytearray(batch_size * 2)
- buf_pos = 0
- buf_remaining = 0 # Number of bytes already in buffer
-
- i = 0
- while i < num_strings:
- # If we don't have enough bytes for the length prefix, read more
- if buf_remaining < _STRING_LENGTH_PREFIX_SIZE:
- # Shift remaining bytes to start of buffer
- if buf_remaining > 0:
- buf[:buf_remaining] = buf[buf_pos : buf_pos + buf_remaining]
-
- # Read more data
- t0 = time()
- chunk = os.read(fd, batch_size)
- t_io += time() - t0
- if not chunk:
- raise IOError("Pipe read returned empty data unexpectedly")
-
- # Append new data to buffer
- chunk_len = len(chunk)
- if buf_remaining + chunk_len > len(buf):
- # Grow buffer if needed
- new_buf = bytearray(len(buf) * 2)
- new_buf[:buf_remaining] = buf[:buf_remaining]
- buf = new_buf
-
- buf[buf_remaining : buf_remaining + chunk_len] = chunk
- buf_remaining += chunk_len
- buf_pos = 0
-
- # Read length prefix (little-endian int32)
- # Note: length can be -1 (0xFFFFFFFF) to indicate null value
- length = struct.unpack(
- "<i", buf[buf_pos : buf_pos + _STRING_LENGTH_PREFIX_SIZE]
- )[0]
- buf_pos += _STRING_LENGTH_PREFIX_SIZE
- buf_remaining -= _STRING_LENGTH_PREFIX_SIZE
-
- # Handle null value (marked by -1)
- if length == -1:
- strings.append(None)
- i += 1
- continue
-
- # If we don't have enough bytes for the string data, read more
- if buf_remaining < length:
- # Shift remaining bytes to start of buffer
- if buf_remaining > 0:
- buf[:buf_remaining] = buf[buf_pos : buf_pos + buf_remaining]
- buf_pos = 0
-
- # Read more data until we have enough
- bytes_needed = length - buf_remaining
- while bytes_needed > 0:
- t0 = time()
- chunk = os.read(fd, min(batch_size, bytes_needed))
- t_io += time() - t0
- if not chunk:
- raise IOError("Pipe read returned empty data unexpectedly")
-
- chunk_len = len(chunk)
- if buf_remaining + chunk_len > len(buf):
- # Grow buffer if needed
- new_buf = bytearray(len(buf) * 2)
- new_buf[:buf_remaining] = buf[:buf_remaining]
- buf = new_buf
-
- buf[buf_remaining : buf_remaining + chunk_len] = chunk
- buf_remaining += chunk_len
- bytes_needed -= chunk_len
-
- # Decode the string
- t0 = time()
- if length == 0:
- decoded_str = ""
- else:
- decoded_str = buf[buf_pos : buf_pos + length].decode("utf-8")
- t_decode += time() - t0
-
- strings.append(decoded_str)
- buf_pos += length
- buf_remaining -= length
- i += 1
- header_received = False
- if buf_remaining == _STRING_LENGTH_PREFIX_SIZE:
- # There is still data in the buffer, probably the handshake header
- received = struct.unpack(
- "<i", buf[buf_pos : buf_pos + _STRING_LENGTH_PREFIX_SIZE]
- )[0]
- if received != pipe_id + _HANDSHAKE_OFFSET:
- raise ValueError(
- "Handshake mismatch: expected {}, got {}".format(
- pipe_id + _HANDSHAKE_OFFSET, received
- )
- )
- header_received = True
- elif buf_remaining > _STRING_LENGTH_PREFIX_SIZE:
- raise ValueError(
- "Unexpected number of bytes in buffer: {}".format(buf_remaining)
- )
-
- t_total = time() - t_total_start
- return (strings, t_total, t_decode, t_io, num_strings, header_received)
-
-
def _get_numpy_value_type(jvm, dtype):
"""Maps numpy dtype to SystemDS ValueType."""
if dtype is np.dtype(np.uint8):
@@ -280,37 +159,43 @@ def _transfer_matrix_block_multi_pipe(
return fut_java.result() # Java returns MatrixBlock
-def numpy_to_matrix_block(sds, np_arr: np.array):
- """Converts a given numpy array, to internal matrix block representation.
+def numpy_to_matrix_block(sds, arr: Union[np.ndarray, sp.spmatrix]):
+ """Converts a given numpy array or scipy sparse matrix to internal matrix
block representation.
:param sds: The current systemds context.
- :param np_arr: the numpy array to convert to matrixblock.
+ :param arr: the numpy array or scipy sparse matrix to convert to
matrixblock.
"""
- assert np_arr.ndim <= 2, "np_arr invalid, because it has more than 2
dimensions"
- rows = np_arr.shape[0]
- cols = np_arr.shape[1] if np_arr.ndim == 2 else 1
+ assert arr.ndim <= 2, "np_arr invalid, because it has more than 2
dimensions"
+ rows = arr.shape[0]
+ cols = arr.shape[1] if arr.ndim == 2 else 1
if rows > 2147483647:
raise ValueError("Matrix rows exceed maximum value (2147483647)")
# If not numpy array then convert to numpy array
- if not isinstance(np_arr, np.ndarray):
- np_arr = np.asarray(np_arr, dtype=np.float64)
+ if isinstance(arr, sp.spmatrix):
+ if sds._sparse_data_transfer:
+ return scipy_sparse_matrix_to_matrix_block(sds, arr)
+ else:
+ # Convert sparse matrix to dense array
+ arr = arr.toarray()
+ if not isinstance(arr, np.ndarray):
+ arr = np.asarray(arr, dtype=np.float64)
jvm: JVMView = sds.java_gateway.jvm
ep = sds.java_gateway.entry_point
# Flatten and set value type
- if np_arr.dtype is np.dtype(np.uint8):
- arr = np_arr.ravel()
- elif np_arr.dtype is np.dtype(np.int32):
- arr = np_arr.ravel()
- elif np_arr.dtype is np.dtype(np.float32):
- arr = np_arr.ravel()
+ if arr.dtype is np.dtype(np.uint8):
+ arr = arr.ravel()
+ elif arr.dtype is np.dtype(np.int32):
+ arr = arr.ravel()
+ elif arr.dtype is np.dtype(np.float32):
+ arr = arr.ravel()
else:
- arr = np_arr.ravel().astype(np.float64)
+ arr = arr.ravel().astype(np.float64)
- value_type = _get_numpy_value_type(jvm, np_arr.dtype)
+ value_type = _get_numpy_value_type(jvm, arr.dtype)
if sds._data_transfer_mode == 1:
mv = memoryview(arr).cast("B")
@@ -334,7 +219,7 @@ def numpy_to_matrix_block(sds, np_arr: np.array):
)
else:
return _transfer_matrix_block_multi_pipe(
- sds, mv, arr, np_arr, total_bytes, rows, cols, value_type, ep,
jvm
+ sds, mv, arr, arr, total_bytes, rows, cols, value_type, ep, jvm
)
else:
# Prepare byte buffer and send data to java via Py4J
@@ -343,6 +228,45 @@ def numpy_to_matrix_block(sds, np_arr: np.array):
return j_class.convertPy4JArrayToMB(buf, rows, cols, value_type)
+def scipy_sparse_matrix_to_matrix_block(sds, arr: sp.spmatrix):
+ """Converts a given scipy sparse matrix to an internal matrix block
representation.
+
+ :param sds: The current systemds context.
+ :param arr: The scipy sparse matrix to convert to matrixblock.
+ """
+ jvm: JVMView = sds.java_gateway.jvm
+
+ if sds._data_transfer_mode == 1:
+ # single pipe implementation
+ pass
+ else:
+ # py4j implementation
+ j_class: JavaClass =
jvm.org.apache.sysds.runtime.util.Py4jConverterUtils
+ if isinstance(arr, sp.csr_matrix):
+ data = arr.data.tobytes()
+ indices = arr.indices.tobytes()
+ indptr = arr.indptr.tobytes()
+ nnz = arr.nnz
+ rows = arr.shape[0]
+ cols = arr.shape[1]
+ # convertSciPyCSRToMB(byte[] data, byte[] indices, byte[] indptr,
int rlen, int clen, int nnz)
+ return j_class.convertSciPyCSRToMB(data, indices, indptr, rows,
cols, nnz)
+ elif isinstance(arr, sp.coo_matrix):
+ data = arr.data.tobytes()
+ row = arr.row.tobytes()
+ col = arr.col.tobytes()
+ nnz = arr.nnz
+ rows = arr.shape[0]
+ cols = arr.shape[1]
+ # convertSciPyCOOToMB(byte[] data, byte[] row, byte[] col, int
rlen, int clen, int nnz)
+ return j_class.convertSciPyCOOToMB(data, row, col, rows, cols, nnz)
+ else:
+ sds._log.info(
+ f"Converting unsupported sparse matrix type:
{type(arr).__name__} to CSR for efficient transfer."
+ )
+ return scipy_sparse_matrix_to_matrix_block(sds, arr.tocsr())
+
+
def matrix_block_to_numpy(sds, mb: JavaObject):
"""Converts a MatrixBlock object in the JVM to a numpy array.
@@ -761,6 +685,129 @@ def _pipe_transfer_strings(pipe, pd_series,
batch_size=_DEFAULT_BATCH_SIZE_BYTES
return (t_total, t_encoding, t_packing, t_io, num_strings)
+def _pipe_receive_strings(
+ pipe, num_strings, batch_size=_DEFAULT_BATCH_SIZE_BYTES, pipe_id=0,
logger=None
+):
+ """
+ Reads UTF-8 encoded strings from the pipe in batches.
+ Format: <I (little-endian int32) length prefix, followed by UTF-8 bytes.
+
+ Returns: tuple of (strings_list, total_time, decode_time, io_time,
num_strings)
+ """
+ t_total_start = time()
+ t_decode = 0.0
+ t_io = 0.0
+
+ strings = []
+ fd = pipe.fileno() # Cache file descriptor
+
+ # Use a reusable buffer to avoid repeated allocations
+ buf = bytearray(batch_size * 2)
+ buf_pos = 0
+ buf_remaining = 0 # Number of bytes already in buffer
+
+ i = 0
+ while i < num_strings:
+ # If we don't have enough bytes for the length prefix, read more
+ if buf_remaining < _STRING_LENGTH_PREFIX_SIZE:
+ # Shift remaining bytes to start of buffer
+ if buf_remaining > 0:
+ buf[:buf_remaining] = buf[buf_pos : buf_pos + buf_remaining]
+
+ # Read more data
+ t0 = time()
+ chunk = os.read(fd, batch_size)
+ t_io += time() - t0
+ if not chunk:
+ raise IOError("Pipe read returned empty data unexpectedly")
+
+ # Append new data to buffer
+ chunk_len = len(chunk)
+ if buf_remaining + chunk_len > len(buf):
+ # Grow buffer if needed
+ new_buf = bytearray(len(buf) * 2)
+ new_buf[:buf_remaining] = buf[:buf_remaining]
+ buf = new_buf
+
+ buf[buf_remaining : buf_remaining + chunk_len] = chunk
+ buf_remaining += chunk_len
+ buf_pos = 0
+
+ # Read length prefix (little-endian int32)
+ # Note: length can be -1 (0xFFFFFFFF) to indicate null value
+ length = struct.unpack(
+ "<i", buf[buf_pos : buf_pos + _STRING_LENGTH_PREFIX_SIZE]
+ )[0]
+ buf_pos += _STRING_LENGTH_PREFIX_SIZE
+ buf_remaining -= _STRING_LENGTH_PREFIX_SIZE
+
+ # Handle null value (marked by -1)
+ if length == -1:
+ strings.append(None)
+ i += 1
+ continue
+
+ # If we don't have enough bytes for the string data, read more
+ if buf_remaining < length:
+ # Shift remaining bytes to start of buffer
+ if buf_remaining > 0:
+ buf[:buf_remaining] = buf[buf_pos : buf_pos + buf_remaining]
+ buf_pos = 0
+
+ # Read more data until we have enough
+ bytes_needed = length - buf_remaining
+ while bytes_needed > 0:
+ t0 = time()
+ chunk = os.read(fd, min(batch_size, bytes_needed))
+ t_io += time() - t0
+ if not chunk:
+ raise IOError("Pipe read returned empty data unexpectedly")
+
+ chunk_len = len(chunk)
+ if buf_remaining + chunk_len > len(buf):
+ # Grow buffer if needed
+ new_buf = bytearray(len(buf) * 2)
+ new_buf[:buf_remaining] = buf[:buf_remaining]
+ buf = new_buf
+
+ buf[buf_remaining : buf_remaining + chunk_len] = chunk
+ buf_remaining += chunk_len
+ bytes_needed -= chunk_len
+
+ # Decode the string
+ t0 = time()
+ if length == 0:
+ decoded_str = ""
+ else:
+ decoded_str = buf[buf_pos : buf_pos + length].decode("utf-8")
+ t_decode += time() - t0
+
+ strings.append(decoded_str)
+ buf_pos += length
+ buf_remaining -= length
+ i += 1
+ header_received = False
+ if buf_remaining == _STRING_LENGTH_PREFIX_SIZE:
+ # There is still data in the buffer, probably the handshake header
+ received = struct.unpack(
+ "<i", buf[buf_pos : buf_pos + _STRING_LENGTH_PREFIX_SIZE]
+ )[0]
+ if received != pipe_id + _HANDSHAKE_OFFSET:
+ raise ValueError(
+ "Handshake mismatch: expected {}, got {}".format(
+ pipe_id + _HANDSHAKE_OFFSET, received
+ )
+ )
+ header_received = True
+ elif buf_remaining > _STRING_LENGTH_PREFIX_SIZE:
+ raise ValueError(
+ "Unexpected number of bytes in buffer: {}".format(buf_remaining)
+ )
+
+ t_total = time() - t_total_start
+ return (strings, t_total, t_decode, t_io, num_strings, header_received)
+
+
def _get_elem_size_for_type(d_type):
"""Returns the element size in bytes for a given SystemDS type."""
return {
diff --git a/src/main/python/tests/matrix/test_block_converter.py
b/src/main/python/tests/matrix/test_block_converter.py
index ef4b28b1bc..afa9ec9963 100644
--- a/src/main/python/tests/matrix/test_block_converter.py
+++ b/src/main/python/tests/matrix/test_block_converter.py
@@ -26,6 +26,7 @@ import numpy as np
from py4j.java_gateway import JVMView
from systemds.context import SystemDSContext
from systemds.utils.converters import matrix_block_to_numpy,
numpy_to_matrix_block
+import scipy.sparse as sp
class Test_MatrixBlockConverter(unittest.TestCase):
@@ -35,7 +36,9 @@ class Test_MatrixBlockConverter(unittest.TestCase):
@classmethod
def setUpClass(cls):
- cls.sds = SystemDSContext(capture_stdout=True, logging_level=50)
+ cls.sds = SystemDSContext(
+ capture_stdout=True, logging_level=50, data_transfer_mode=0
+ )
@classmethod
def tearDownClass(cls):
@@ -70,10 +73,150 @@ class Test_MatrixBlockConverter(unittest.TestCase):
array = np.array([rng.standard_normal(n) for x in range(k)])
self.convert_back_and_forth(array)
+ def test_random_sparse_csr_nxn(self):
+ n = 10
+ array = sp.rand(n, n, density=0.1, format="csr")
+ self.convert_back_and_forth(array)
+
+ def test_sparse_csr_rectangular(self):
+ """Test CSR conversion with rectangular matrices"""
+ array = sp.rand(5, 10, density=0.2, format="csr")
+ self.convert_back_and_forth(array)
+
+ def test_sparse_csr_known_values(self):
+ """Test CSR conversion with a known sparse matrix"""
+ # Create a known CSR matrix
+ data = np.array([1.0, 2.0, 3.0, 4.0])
+ row = np.array([0, 0, 1, 2])
+ col = np.array([0, 2, 1, 2])
+ array = sp.csr_matrix((data, (row, col)), shape=(3, 3))
+ self.convert_back_and_forth(array)
+
+ def test_empty_dense_0x0(self):
+ """Test conversion of empty 0x0 dense matrix"""
+ array = np.array([]).reshape(0, 0)
+ self.convert_back_and_forth(array)
+
+ def test_empty_dense_0xn(self):
+ """Test conversion of empty matrix with zero rows"""
+ array = np.array([]).reshape(0, 5)
+ self.convert_back_and_forth(array)
+
+ def test_empty_dense_nx0(self):
+ """Test conversion of empty matrix with zero columns"""
+ array = np.array([]).reshape(5, 0)
+ self.convert_back_and_forth(array)
+
+ def test_sparse_csr_empty_rows(self):
+ """Test CSR conversion with rows that have no non-zero entries"""
+ # 4x3 matrix: row 0 has 2 entries, row 1 is empty, row 2 has 1 entry,
row 3 is empty
+ data = np.array([1.0, 2.0, 3.0])
+ row = np.array([0, 0, 2])
+ col = np.array([0, 2, 1])
+ array = sp.csr_matrix((data, (row, col)), shape=(4, 3))
+ self.convert_back_and_forth(array)
+
+ def test_sparse_csr_first_and_last_row_empty(self):
+ """Test CSR with first and last rows empty"""
+ data = np.array([1.0, 2.0])
+ row = np.array([1, 1])
+ col = np.array([0, 1])
+ array = sp.csr_matrix((data, (row, col)), shape=(3, 2))
+ self.convert_back_and_forth(array)
+
+ def test_sparse_csr_all_zeros(self):
+ """Test CSR with no non-zero entries (empty structure)"""
+ array = sp.csr_matrix((3, 4))
+ self.convert_back_and_forth(array)
+
+ def test_sparse_coo_empty_rows(self):
+ """Test COO conversion with empty rows"""
+ data = np.array([1.0, 2.0])
+ row = np.array([0, 2])
+ col = np.array([1, 1])
+ array = sp.coo_matrix((data, (row, col)), shape=(4, 2))
+ self.convert_back_and_forth(array)
+
+ def test_dense_all_zeros(self):
+ """Test dense matrix with all zeros"""
+ array = np.zeros((4, 3))
+ self.convert_back_and_forth(array)
+
+ def test_dense_single_row(self):
+ """Test dense matrix with single row (1xn)"""
+ array = np.array([[1.0, 2.0, 3.0]])
+ self.convert_back_and_forth(array)
+
+ def test_dense_single_column(self):
+ """Test dense matrix with single column (nx1)"""
+ array = np.array([[1.0], [2.0], [3.0]])
+ self.convert_back_and_forth(array)
+
+ def test_random_sparse_coo_nxn(self):
+ n = 10
+ array = sp.rand(n, n, density=0.1, format="coo")
+ self.convert_back_and_forth(array)
+
+ def test_sparse_csr_empty_0x0(self):
+ """Test empty 0x0 CSR matrix"""
+ array = sp.csr_matrix((0, 0))
+ self.convert_back_and_forth(array)
+
+ def test_sparse_csr_empty_0xn(self):
+ """Test CSR with zero rows"""
+ array = sp.csr_matrix((0, 4))
+ self.convert_back_and_forth(array)
+
+ def test_sparse_csr_empty_nx0(self):
+ """Test CSR with zero columns"""
+ array = sp.csr_matrix((4, 0))
+ self.convert_back_and_forth(array)
+
+ def test_sparse_csr_single_element(self):
+ """Test 1x1 CSR with one non-zero"""
+ array = sp.csr_matrix(([3.14], ([0], [0])), shape=(1, 1))
+ self.convert_back_and_forth(array)
+
+ def test_sparse_csr_single_row(self):
+ """Test 1xn CSR (single row)"""
+ array = sp.csr_matrix(([1.0, 2.0], ([0, 0], [0, 2])), shape=(1, 4))
+ self.convert_back_and_forth(array)
+
+ def test_sparse_csr_single_column(self):
+ """Test nx1 CSR (single column)"""
+ array = sp.csr_matrix(([1.0, 2.0], ([0, 2], [0, 0])), shape=(4, 1))
+ self.convert_back_and_forth(array)
+
+ def test_sparse_csr_empty_columns(self):
+ """Test CSR where some columns have no non-zero entries"""
+ # 3x4 matrix: only columns 0 and 2 have entries
+ data = np.array([1.0, 2.0, 3.0])
+ row = np.array([0, 1, 2])
+ col = np.array([0, 2, 2])
+ array = sp.csr_matrix((data, (row, col)), shape=(3, 4))
+ self.convert_back_and_forth(array)
+
+ def test_sparse_coo_single_element(self):
+ """Test COO with single non-zero"""
+ array = sp.coo_matrix(([1.0], ([2], [3])), shape=(4, 5))
+ self.convert_back_and_forth(array)
+
+ def test_sparse_coo_all_zeros(self):
+ """Test COO with no non-zero entries"""
+ array = sp.coo_matrix((2, 3))
+ self.convert_back_and_forth(array)
+
+ def test_sparse_csc_rectangular(self):
+ """Test CSC conversion (fallback path: converted to dense in
converter)"""
+ array = sp.csc_matrix(([1.0, 2.0, 3.0], ([0, 1, 2], [0, 0, 1])),
shape=(3, 2))
+ self.convert_back_and_forth(array)
+
def convert_back_and_forth(self, array):
matrix_block = numpy_to_matrix_block(self.sds, array)
# use the ability to call functions on matrix_block.
returned = matrix_block_to_numpy(self.sds, matrix_block)
+ if isinstance(array, sp.spmatrix):
+ array = array.toarray()
self.assertTrue(np.allclose(array, returned))
diff --git
a/src/test/java/org/apache/sysds/test/component/frame/array/Py4jConverterUtilsTest.java
b/src/test/java/org/apache/sysds/test/component/frame/array/Py4jConverterUtilsTest.java
deleted file mode 100644
index 466c3337d8..0000000000
---
a/src/test/java/org/apache/sysds/test/component/frame/array/Py4jConverterUtilsTest.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysds.test.component.frame.array;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.charset.StandardCharsets;
-
-import org.apache.sysds.common.Types;
-import org.apache.sysds.common.Types.ValueType;
-import org.apache.sysds.runtime.util.Py4jConverterUtils;
-import org.apache.sysds.runtime.frame.data.columns.Array;
-import org.junit.Test;
-
-public class Py4jConverterUtilsTest {
-
- @Test
- public void testConvertUINT8() {
- int numElements = 4;
- byte[] data = {1, 2, 3, 4};
- Array<?> result = Py4jConverterUtils.convert(data, numElements,
Types.ValueType.UINT8);
- assertNotNull(result);
- assertEquals(4, result.size());
- assertEquals(1, result.get(0));
- assertEquals(2, result.get(1));
- assertEquals(3, result.get(2));
- assertEquals(4, result.get(3));
- }
-
- @Test
- public void testConvertINT32() {
- int numElements = 4;
- ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES *
numElements);
- buffer.order(ByteOrder.nativeOrder());
- for(int i = 1; i <= numElements; i++) {
- buffer.putInt(i);
- }
- Array<?> result = Py4jConverterUtils.convert(buffer.array(),
numElements, Types.ValueType.INT32);
- assertNotNull(result);
- assertEquals(4, result.size());
- assertEquals(1, result.get(0));
- assertEquals(2, result.get(1));
- assertEquals(3, result.get(2));
- assertEquals(4, result.get(3));
- }
-
- @Test
- public void testConvertINT64() {
- int numElements = 4;
- ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES *
numElements);
- buffer.order(ByteOrder.nativeOrder());
- for(int i = 1; i <= numElements; i++) {
- buffer.putLong((long) i);
- }
- Array<?> result = Py4jConverterUtils.convert(buffer.array(),
numElements, Types.ValueType.INT64);
- assertNotNull(result);
- assertEquals(4, result.size());
- assertEquals(1L, result.get(0));
- assertEquals(2L, result.get(1));
- assertEquals(3L, result.get(2));
- assertEquals(4L, result.get(3));
- }
-
-
- @Test
- public void testConvertHASH32() {
- int numElements = 4;
- ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES *
numElements);
- buffer.order(ByteOrder.nativeOrder());
- for(int i = 1; i <= numElements; i++) {
- buffer.putInt(i);
- }
- Array<?> result = Py4jConverterUtils.convert(buffer.array(),
numElements, Types.ValueType.HASH32);
- assertNotNull(result);
- assertEquals(4, result.size());
- assertEquals("1", result.get(0));
- assertEquals("2", result.get(1));
- assertEquals("3", result.get(2));
- assertEquals("4", result.get(3));
- }
-
- @Test
- public void testConvertHASH64() {
- int numElements = 4;
- ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES *
numElements);
- buffer.order(ByteOrder.nativeOrder());
- for(int i = 1; i <= numElements; i++) {
- buffer.putLong((long) i);
- }
- Array<?> result = Py4jConverterUtils.convert(buffer.array(),
numElements, Types.ValueType.HASH64);
- assertNotNull(result);
- assertEquals(4, result.size());
- assertEquals("1", result.get(0));
- assertEquals("2", result.get(1));
- assertEquals("3", result.get(2));
- assertEquals("4", result.get(3));
- }
-
- @Test
- public void testConvertFP32() {
- int numElements = 4;
- ByteBuffer buffer = ByteBuffer.allocate(Float.BYTES *
numElements);
- buffer.order(ByteOrder.nativeOrder());
- for(float i = 1.1f; i <= numElements + 1; i += 1.0) {
- buffer.putFloat(i);
- }
- Array<?> result = Py4jConverterUtils.convert(buffer.array(),
numElements, Types.ValueType.FP32);
- assertNotNull(result);
- assertEquals(4, result.size());
- assertEquals(1.1f, result.get(0));
- assertEquals(2.1f, result.get(1));
- assertEquals(3.1f, result.get(2));
- assertEquals(4.1f, result.get(3));
- }
-
- @Test
- public void testConvertFP64() {
- int numElements = 4;
- ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES *
numElements);
- buffer.order(ByteOrder.nativeOrder());
- for(double i = 1.1; i <= numElements + 1; i += 1.0) {
- buffer.putDouble(i);
- }
- Array<?> result = Py4jConverterUtils.convert(buffer.array(),
numElements, Types.ValueType.FP64);
- assertNotNull(result);
- assertEquals(4, result.size());
- assertEquals(1.1, result.get(0));
- assertEquals(2.1, result.get(1));
- assertEquals(3.1, result.get(2));
- assertEquals(4.1, result.get(3));
- }
-
- @Test
- public void testConvertBoolean() {
- int numElements = 4;
- byte[] data = {1, 0, 1, 0};
- Array<?> result = Py4jConverterUtils.convert(data, numElements,
Types.ValueType.BOOLEAN);
- assertNotNull(result);
- assertEquals(4, result.size());
- assertEquals(true, result.get(0));
- assertEquals(false, result.get(1));
- assertEquals(true, result.get(2));
- assertEquals(false, result.get(3));
- }
-
- @Test
- public void testConvertString() {
- int numElements = 2;
- String[] strings = {"hello", "world"};
- ByteBuffer buffer = ByteBuffer.allocate(4 + strings[0].length()
+ 4 + strings[1].length());
- buffer.order(ByteOrder.LITTLE_ENDIAN);
- for(String s : strings) {
- buffer.putInt(s.length());
- buffer.put(s.getBytes(StandardCharsets.UTF_8));
- }
- Array<?> result = Py4jConverterUtils.convert(buffer.array(),
numElements, Types.ValueType.STRING);
- assertNotNull(result);
- assertEquals(2, result.size());
- assertEquals("hello", result.get(0));
- assertEquals("world", result.get(1));
- }
-
- @Test
- public void testConvertChar() {
- char[] c = {'h', 'e', 'l', 'l', 'o', ' ', 'w', 'o', 'r', 'l',
'd'};
- ByteBuffer buffer = ByteBuffer.allocate(Character.BYTES *
c.length);
- buffer.order(ByteOrder.LITTLE_ENDIAN);
- for(char s : c) {
- buffer.putChar(s);
- }
- Array<?> result = Py4jConverterUtils.convert(buffer.array(),
c.length, Types.ValueType.CHARACTER);
- assertNotNull(result);
- assertEquals(c.length, result.size());
-
- for(int i = 0; i < c.length; i++) {
- assertEquals(c[i], result.get(i));
- }
- }
-
- @Test
- public void testConvertRow() {
- int numElements = 4;
- byte[] data = {1, 2, 3, 4};
- Object[] row = Py4jConverterUtils.convertRow(data, numElements,
Types.ValueType.UINT8);
- assertNotNull(row);
- assertEquals(4, row.length);
- assertEquals(1, row[0]);
- assertEquals(2, row[1]);
- assertEquals(3, row[2]);
- assertEquals(4, row[3]);
- }
-
- @Test
- public void testConvertFused() {
- int numElements = 1;
- byte[] data = {1, 2, 3, 4};
- Types.ValueType[] valueTypes = {ValueType.UINT8,
ValueType.UINT8, ValueType.UINT8, ValueType.UINT8};
- Array<?>[] arrays = Py4jConverterUtils.convertFused(data,
numElements, valueTypes);
- assertNotNull(arrays);
- assertEquals(4, arrays.length);
- for(int i = 0; i < 4; i++) {
- assertEquals(1 + i, arrays[i].get(0));
- }
- }
-
- @Test(expected = Exception.class)
- public void nullData() {
- Py4jConverterUtils.convert(null, 14, ValueType.BOOLEAN);
- }
-
- @Test(expected = Exception.class)
- public void nullValueType() {
- Py4jConverterUtils.convert(new byte[] {1, 2, 3}, 14, null);
- }
-
- @Test(expected = Exception.class)
- public void unknownValueType() {
- Py4jConverterUtils.convert(new byte[] {1, 2, 3}, 14,
ValueType.UNKNOWN);
- }
-}
diff --git
a/src/test/java/org/apache/sysds/test/component/utils/Py4jConverterUtilsTest.java
b/src/test/java/org/apache/sysds/test/component/utils/Py4jConverterUtilsTest.java
new file mode 100644
index 0000000000..2399f695a7
--- /dev/null
+++
b/src/test/java/org/apache/sysds/test/component/utils/Py4jConverterUtilsTest.java
@@ -0,0 +1,510 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.sysds.common.Types;
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.frame.data.columns.Array;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.util.Py4jConverterUtils;
+import org.junit.Test;
+
+public class Py4jConverterUtilsTest {
+
+ @Test
+ public void testConvertUINT8() {
+ int numElements = 4;
+ byte[] data = {1, 2, 3, 4};
+ Array<?> result = Py4jConverterUtils.convert(data, numElements,
Types.ValueType.UINT8);
+ assertNotNull(result);
+ assertEquals(4, result.size());
+ assertEquals(1, result.get(0));
+ assertEquals(2, result.get(1));
+ assertEquals(3, result.get(2));
+ assertEquals(4, result.get(3));
+ }
+
+ @Test
+ public void testConvertINT32() {
+ int numElements = 4;
+ ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES *
numElements);
+ buffer.order(ByteOrder.nativeOrder());
+ for(int i = 1; i <= numElements; i++) {
+ buffer.putInt(i);
+ }
+ Array<?> result = Py4jConverterUtils.convert(buffer.array(),
numElements, Types.ValueType.INT32);
+ assertNotNull(result);
+ assertEquals(4, result.size());
+ assertEquals(1, result.get(0));
+ assertEquals(2, result.get(1));
+ assertEquals(3, result.get(2));
+ assertEquals(4, result.get(3));
+ }
+
+ @Test
+ public void testConvertINT64() {
+ int numElements = 4;
+ ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES *
numElements);
+ buffer.order(ByteOrder.nativeOrder());
+ for(int i = 1; i <= numElements; i++) {
+ buffer.putLong((long) i);
+ }
+ Array<?> result = Py4jConverterUtils.convert(buffer.array(),
numElements, Types.ValueType.INT64);
+ assertNotNull(result);
+ assertEquals(4, result.size());
+ assertEquals(1L, result.get(0));
+ assertEquals(2L, result.get(1));
+ assertEquals(3L, result.get(2));
+ assertEquals(4L, result.get(3));
+ }
+
+
+ @Test
+ public void testConvertHASH32() {
+ int numElements = 4;
+ ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES *
numElements);
+ buffer.order(ByteOrder.nativeOrder());
+ for(int i = 1; i <= numElements; i++) {
+ buffer.putInt(i);
+ }
+ Array<?> result = Py4jConverterUtils.convert(buffer.array(),
numElements, Types.ValueType.HASH32);
+ assertNotNull(result);
+ assertEquals(4, result.size());
+ assertEquals("1", result.get(0));
+ assertEquals("2", result.get(1));
+ assertEquals("3", result.get(2));
+ assertEquals("4", result.get(3));
+ }
+
+ @Test
+ public void testConvertHASH64() {
+ int numElements = 4;
+ ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES *
numElements);
+ buffer.order(ByteOrder.nativeOrder());
+ for(int i = 1; i <= numElements; i++) {
+ buffer.putLong((long) i);
+ }
+ Array<?> result = Py4jConverterUtils.convert(buffer.array(),
numElements, Types.ValueType.HASH64);
+ assertNotNull(result);
+ assertEquals(4, result.size());
+ assertEquals("1", result.get(0));
+ assertEquals("2", result.get(1));
+ assertEquals("3", result.get(2));
+ assertEquals("4", result.get(3));
+ }
+
+ @Test
+ public void testConvertFP32() {
+ int numElements = 4;
+ ByteBuffer buffer = ByteBuffer.allocate(Float.BYTES *
numElements);
+ buffer.order(ByteOrder.nativeOrder());
+ for(float i = 1.1f; i <= numElements + 1; i += 1.0) {
+ buffer.putFloat(i);
+ }
+ Array<?> result = Py4jConverterUtils.convert(buffer.array(),
numElements, Types.ValueType.FP32);
+ assertNotNull(result);
+ assertEquals(4, result.size());
+ assertEquals(1.1f, result.get(0));
+ assertEquals(2.1f, result.get(1));
+ assertEquals(3.1f, result.get(2));
+ assertEquals(4.1f, result.get(3));
+ }
+
+ @Test
+ public void testConvertFP64() {
+ int numElements = 4;
+ ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES *
numElements);
+ buffer.order(ByteOrder.nativeOrder());
+ for(double i = 1.1; i <= numElements + 1; i += 1.0) {
+ buffer.putDouble(i);
+ }
+ Array<?> result = Py4jConverterUtils.convert(buffer.array(),
numElements, Types.ValueType.FP64);
+ assertNotNull(result);
+ assertEquals(4, result.size());
+ assertEquals(1.1, result.get(0));
+ assertEquals(2.1, result.get(1));
+ assertEquals(3.1, result.get(2));
+ assertEquals(4.1, result.get(3));
+ }
+
+ @Test
+ public void testConvertBoolean() {
+ int numElements = 4;
+ byte[] data = {1, 0, 1, 0};
+ Array<?> result = Py4jConverterUtils.convert(data, numElements,
Types.ValueType.BOOLEAN);
+ assertNotNull(result);
+ assertEquals(4, result.size());
+ assertEquals(true, result.get(0));
+ assertEquals(false, result.get(1));
+ assertEquals(true, result.get(2));
+ assertEquals(false, result.get(3));
+ }
+
+ @Test
+ public void testConvertString() {
+ int numElements = 2;
+ String[] strings = {"hello", "world"};
+ ByteBuffer buffer = ByteBuffer.allocate(4 + strings[0].length()
+ 4 + strings[1].length());
+ buffer.order(ByteOrder.LITTLE_ENDIAN);
+ for(String s : strings) {
+ buffer.putInt(s.length());
+ buffer.put(s.getBytes(StandardCharsets.UTF_8));
+ }
+ Array<?> result = Py4jConverterUtils.convert(buffer.array(),
numElements, Types.ValueType.STRING);
+ assertNotNull(result);
+ assertEquals(2, result.size());
+ assertEquals("hello", result.get(0));
+ assertEquals("world", result.get(1));
+ }
+
+ @Test
+ public void testConvertChar() {
+ char[] c = {'h', 'e', 'l', 'l', 'o', ' ', 'w', 'o', 'r', 'l',
'd'};
+ ByteBuffer buffer = ByteBuffer.allocate(Character.BYTES *
c.length);
+ buffer.order(ByteOrder.LITTLE_ENDIAN);
+ for(char s : c) {
+ buffer.putChar(s);
+ }
+ Array<?> result = Py4jConverterUtils.convert(buffer.array(),
c.length, Types.ValueType.CHARACTER);
+ assertNotNull(result);
+ assertEquals(c.length, result.size());
+
+ for(int i = 0; i < c.length; i++) {
+ assertEquals(c[i], result.get(i));
+ }
+ }
+
+ @Test
+ public void testConvertRow() {
+ int numElements = 4;
+ byte[] data = {1, 2, 3, 4};
+ Object[] row = Py4jConverterUtils.convertRow(data, numElements,
Types.ValueType.UINT8);
+ assertNotNull(row);
+ assertEquals(4, row.length);
+ assertEquals(1, row[0]);
+ assertEquals(2, row[1]);
+ assertEquals(3, row[2]);
+ assertEquals(4, row[3]);
+ }
+
+ @Test
+ public void testConvertFused() {
+ int numElements = 1;
+ byte[] data = {1, 2, 3, 4};
+ Types.ValueType[] valueTypes = {ValueType.UINT8,
ValueType.UINT8, ValueType.UINT8, ValueType.UINT8};
+ Array<?>[] arrays = Py4jConverterUtils.convertFused(data,
numElements, valueTypes);
+ assertNotNull(arrays);
+ assertEquals(4, arrays.length);
+ for(int i = 0; i < 4; i++) {
+ assertEquals(1 + i, arrays[i].get(0));
+ }
+ }
+
+ @Test(expected = Exception.class)
+ public void nullData() {
+ Py4jConverterUtils.convert(null, 14, ValueType.BOOLEAN);
+ }
+
+ @Test(expected = Exception.class)
+ public void nullValueType() {
+ Py4jConverterUtils.convert(new byte[] {1, 2, 3}, 14, null);
+ }
+
+ @Test(expected = Exception.class)
+ public void unknownValueType() {
+ Py4jConverterUtils.convert(new byte[] {1, 2, 3}, 14,
ValueType.UNKNOWN);
+ }
+
+ @Test
+ public void testConvertPy4JArrayToMBFP64() {
+ int rlen = 2;
+ int clen = 3;
+ ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES * rlen *
clen);
+ buffer.order(ByteOrder.nativeOrder());
+ double[] values = {1.1, 2.2, 3.3, 4.4, 5.5, 6.6};
+ for(double val : values) {
+ buffer.putDouble(val);
+ }
+ MatrixBlock mb =
Py4jConverterUtils.convertPy4JArrayToMB(buffer.array(), rlen, clen);
+ assertNotNull(mb);
+ assertEquals(rlen, mb.getNumRows());
+ assertEquals(clen, mb.getNumColumns());
+ assertEquals(1.1, mb.get(0, 0), 0.0001);
+ assertEquals(2.2, mb.get(0, 1), 0.0001);
+ assertEquals(3.3, mb.get(0, 2), 0.0001);
+ assertEquals(4.4, mb.get(1, 0), 0.0001);
+ assertEquals(5.5, mb.get(1, 1), 0.0001);
+ assertEquals(6.6, mb.get(1, 2), 0.0001);
+ }
+
+ @Test
+ public void testConvertPy4JArrayToMBUINT8() {
+ int rlen = 2;
+ int clen = 2;
+ byte[] data = {(byte) 1, (byte) 2, (byte) 3, (byte) 4};
+ MatrixBlock mb = Py4jConverterUtils.convertPy4JArrayToMB(data,
rlen, clen, ValueType.UINT8);
+ assertNotNull(mb);
+ assertEquals(rlen, mb.getNumRows());
+ assertEquals(clen, mb.getNumColumns());
+ assertEquals(1.0, mb.get(0, 0), 0.0001);
+ assertEquals(2.0, mb.get(0, 1), 0.0001);
+ assertEquals(3.0, mb.get(1, 0), 0.0001);
+ assertEquals(4.0, mb.get(1, 1), 0.0001);
+ }
+
+ @Test
+ public void testConvertPy4JArrayToMBINT32() {
+ int rlen = 2;
+ int clen = 2;
+ ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * rlen *
clen);
+ buffer.order(ByteOrder.nativeOrder());
+ int[] values = {10, 20, 30, 40};
+ for(int val : values) {
+ buffer.putInt(val);
+ }
+ MatrixBlock mb =
Py4jConverterUtils.convertPy4JArrayToMB(buffer.array(), rlen, clen,
ValueType.INT32);
+ assertNotNull(mb);
+ assertEquals(rlen, mb.getNumRows());
+ assertEquals(clen, mb.getNumColumns());
+ assertEquals(10.0, mb.get(0, 0), 0.0001);
+ assertEquals(20.0, mb.get(0, 1), 0.0001);
+ assertEquals(30.0, mb.get(1, 0), 0.0001);
+ assertEquals(40.0, mb.get(1, 1), 0.0001);
+ }
+
+ @Test
+ public void testConvertPy4JArrayToMBFP32() {
+ int rlen = 2;
+ int clen = 2;
+ ByteBuffer buffer = ByteBuffer.allocate(Float.BYTES * rlen *
clen);
+ buffer.order(ByteOrder.nativeOrder());
+ float[] values = {1.5f, 2.5f, 3.5f, 4.5f};
+ for(float val : values) {
+ buffer.putFloat(val);
+ }
+ MatrixBlock mb =
Py4jConverterUtils.convertPy4JArrayToMB(buffer.array(), rlen, clen,
ValueType.FP32);
+ assertNotNull(mb);
+ assertEquals(rlen, mb.getNumRows());
+ assertEquals(clen, mb.getNumColumns());
+ assertEquals(1.5f, mb.get(0, 0), 0.0001);
+ assertEquals(2.5f, mb.get(0, 1), 0.0001);
+ assertEquals(3.5f, mb.get(1, 0), 0.0001);
+ assertEquals(4.5f, mb.get(1, 1), 0.0001);
+ }
+
+ @Test(expected = DMLRuntimeException.class)
+ public void testConvertPy4JArrayToMBSparseNotSupported() {
+ int rlen = 2;
+ int clen = 2;
+ byte[] data = {1, 2, 3, 4};
+ Py4jConverterUtils.convertPy4JArrayToMB(data, rlen, clen, true,
ValueType.UINT8);
+ }
+
+ @Test
+ public void testConvertSciPyCOOToMB() {
+ int rlen = 10;
+ int clen = 10;
+ int nnz = 3;
+ // Create COO format: values at (0,0)=1.0, (1,2)=2.0, (2,1)=3.0
+ ByteBuffer dataBuf = ByteBuffer.allocate(Double.BYTES * nnz);
+ dataBuf.order(ByteOrder.nativeOrder());
+ dataBuf.putDouble(1.0);
+ dataBuf.putDouble(2.0);
+ dataBuf.putDouble(3.0);
+
+ ByteBuffer rowBuf = ByteBuffer.allocate(Integer.BYTES * nnz);
+ rowBuf.order(ByteOrder.nativeOrder());
+ rowBuf.putInt(0);
+ rowBuf.putInt(1);
+ rowBuf.putInt(2);
+
+ ByteBuffer colBuf = ByteBuffer.allocate(Integer.BYTES * nnz);
+ colBuf.order(ByteOrder.nativeOrder());
+ colBuf.putInt(0);
+ colBuf.putInt(2);
+ colBuf.putInt(1);
+
+ MatrixBlock mb = Py4jConverterUtils.convertSciPyCOOToMB(
+ dataBuf.array(), rowBuf.array(), colBuf.array(), rlen,
clen, nnz);
+ assertNotNull(mb);
+ assertEquals(rlen, mb.getNumRows());
+ assertEquals(clen, mb.getNumColumns());
+ assertTrue(mb.isInSparseFormat());
+ assertEquals(1.0, mb.get(0, 0), 0.0001);
+ assertEquals(2.0, mb.get(1, 2), 0.0001);
+ assertEquals(3.0, mb.get(2, 1), 0.0001);
+ assertEquals(0.0, mb.get(0, 1), 0.0001);
+ assertEquals(0.0, mb.get(1, 0), 0.0001);
+ }
+
+ @Test
+ public void testConvertSciPyCSRToMB() {
+ int rlen = 10;
+ int clen = 10;
+ int nnz = 3;
+ // Create CSR format: values at (0,0)=1.0, (1,2)=2.0, (2,1)=3.0
+ ByteBuffer dataBuf = ByteBuffer.allocate(Double.BYTES * nnz);
+ dataBuf.order(ByteOrder.nativeOrder());
+ dataBuf.putDouble(1.0);
+ dataBuf.putDouble(2.0);
+ dataBuf.putDouble(3.0);
+
+ ByteBuffer indicesBuf = ByteBuffer.allocate(Integer.BYTES *
nnz);
+ indicesBuf.order(ByteOrder.nativeOrder());
+ indicesBuf.putInt(0); // column for row 0
+ indicesBuf.putInt(2); // column for row 1
+ indicesBuf.putInt(1); // column for row 2
+
+ ByteBuffer indptrBuf = ByteBuffer.allocate(Integer.BYTES *
(rlen + 1));
+ indptrBuf.order(ByteOrder.nativeOrder());
+ indptrBuf.putInt(0); // row 0 starts at index 0
+ indptrBuf.putInt(1); // row 1 starts at index 1
+ indptrBuf.putInt(2); // row 2 starts at index 2
+ indptrBuf.putInt(3); // end marker
+
+ MatrixBlock mb = Py4jConverterUtils.convertSciPyCSRToMB(
+ dataBuf.array(), indicesBuf.array(), indptrBuf.array(),
rlen, clen, nnz);
+ assertNotNull(mb);
+ assertEquals(rlen, mb.getNumRows());
+ assertEquals(clen, mb.getNumColumns());
+ assertTrue(mb.isInSparseFormat());
+ assertEquals(1.0, mb.get(0, 0), 0.0001);
+ assertEquals(2.0, mb.get(1, 2), 0.0001);
+ assertEquals(3.0, mb.get(2, 1), 0.0001);
+ assertEquals(0.0, mb.get(0, 1), 0.0001);
+ assertEquals(0.0, mb.get(1, 0), 0.0001);
+ }
+
+ @Test
+ public void testAllocateDenseOrSparseDense() {
+ int rlen = 5;
+ int clen = 5;
+ MatrixBlock mb = Py4jConverterUtils.allocateDenseOrSparse(rlen,
clen, false);
+ assertNotNull(mb);
+ assertEquals(rlen, mb.getNumRows());
+ assertEquals(clen, mb.getNumColumns());
+ assertTrue(!mb.isInSparseFormat());
+ }
+
+ @Test
+ public void testAllocateDenseOrSparseSparse() {
+ int rlen = 5;
+ int clen = 5;
+ MatrixBlock mb = Py4jConverterUtils.allocateDenseOrSparse(rlen,
clen, true);
+ assertNotNull(mb);
+ assertEquals(rlen, mb.getNumRows());
+ assertEquals(clen, mb.getNumColumns());
+ assertTrue(mb.isInSparseFormat());
+ }
+
+ @Test
+ public void testAllocateDenseOrSparseLong() {
+ long rlen = 10L;
+ long clen = 10L;
+ MatrixBlock mb = Py4jConverterUtils.allocateDenseOrSparse(rlen,
clen, false);
+ assertNotNull(mb);
+ assertEquals((int) rlen, mb.getNumRows());
+ assertEquals((int) clen, mb.getNumColumns());
+ }
+
+ @Test(expected = DMLRuntimeException.class)
+ public void testAllocateDenseOrSparseLongTooLarge() {
+ long rlen = Integer.MAX_VALUE + 1L;
+ long clen = 10L;
+ Py4jConverterUtils.allocateDenseOrSparse(rlen, clen, false);
+ }
+
+ @Test
+ public void testConvertMBtoPy4JDenseArr() {
+ int rlen = 2;
+ int clen = 2;
+ MatrixBlock mb = new MatrixBlock(rlen, clen, false);
+ mb.allocateBlock();
+ mb.set(0, 0, 1.0);
+ mb.set(0, 1, 2.0);
+ mb.set(1, 0, 3.0);
+ mb.set(1, 1, 4.0);
+
+ byte[] result = Py4jConverterUtils.convertMBtoPy4JDenseArr(mb);
+ assertNotNull(result);
+ assertEquals(Double.BYTES * rlen * clen, result.length);
+
+ ByteBuffer buffer = ByteBuffer.wrap(result);
+ buffer.order(ByteOrder.nativeOrder());
+ assertEquals(1.0, buffer.getDouble(), 0.0001);
+ assertEquals(2.0, buffer.getDouble(), 0.0001);
+ assertEquals(3.0, buffer.getDouble(), 0.0001);
+ assertEquals(4.0, buffer.getDouble(), 0.0001);
+ }
+
+ @Test
+ public void testConvertMBtoPy4JDenseArrRoundTrip() {
+ int rlen = 2;
+ int clen = 3;
+ ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES * rlen *
clen);
+ buffer.order(ByteOrder.nativeOrder());
+ double[] values = {1.1, 2.2, 3.3, 4.4, 5.5, 6.6};
+ for(double val : values) {
+ buffer.putDouble(val);
+ }
+
+ MatrixBlock mb =
Py4jConverterUtils.convertPy4JArrayToMB(buffer.array(), rlen, clen);
+ byte[] result = Py4jConverterUtils.convertMBtoPy4JDenseArr(mb);
+
+ ByteBuffer resultBuffer = ByteBuffer.wrap(result);
+ resultBuffer.order(ByteOrder.nativeOrder());
+ for(double expected : values) {
+ assertEquals(expected, resultBuffer.getDouble(),
0.0001);
+ }
+ }
+
+ @Test
+ public void testConvertMBtoPy4JDenseArrSparseToDense() {
+ new Py4jConverterUtils();
+ int rlen = 3;
+ int clen = 3;
+ MatrixBlock mb = new MatrixBlock(rlen, clen, true);
+ mb.allocateSparseRowsBlock(false);
+ mb.set(0, 0, 1.0);
+ mb.set(2, 2, 2.0);
+
+ byte[] result = Py4jConverterUtils.convertMBtoPy4JDenseArr(mb);
+ assertNotNull(result);
+ assertEquals(Double.BYTES * rlen * clen, result.length);
+
+ ByteBuffer buffer = ByteBuffer.wrap(result);
+ buffer.order(ByteOrder.nativeOrder());
+ assertEquals(1.0, buffer.getDouble(), 0.0001);
+ // Skip to position (2,2) = index 8
+ for(int i = 1; i < 8; i++) {
+ assertEquals(0.0, buffer.getDouble(), 0.0001);
+ }
+ assertEquals(2.0, buffer.getDouble(), 0.0001);
+ }
+}