This is an automated email from the ASF dual-hosted git repository.
baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new bea9c96fc1 [SYSTEMDS-3548] Optimize python dataframe transfer
bea9c96fc1 is described below
commit bea9c96fc1acae107498982724e4a739c8c98d74
Author: Nakroma <[email protected]>
AuthorDate: Wed Feb 5 22:38:35 2025 +0100
[SYSTEMDS-3548] Optimize python dataframe transfer
This commit optimizes how the pandas_to_frame_block function accesses Java
types.
It also fixes a small regression, where exceptions from the parallelization
threads weren't propagating exceptions properly.
- Fix perftests not working with large, split-up datasets IO datagen splits
large datasets into multiple files (for example 100k_1k). This commit makes
load_pandas.py and load_numpy.py able to read those.
- Add pandas to FrameBlock row-wise parallel processing in the case of cols
> rows. It also adds some other small, unused utility methods.
- Add javadocs
- Adjust Py4jConverterUtilsTest to reflect the code changes in the main
class.
- adds missing tests for added code in SYSTEMDS-3548. This includes the
FrameBlock and Py4jConverterUtils functions, as well as python pandas to
systemds io e2e tests.
- Fix pandas io test (rows have to be >4)
Closes #2189
---
scripts/perftest/python/io/load_numpy.py | 7 +-
scripts/perftest/python/io/load_pandas.py | 7 +-
.../sysds/runtime/frame/data/FrameBlock.java | 60 ++++++++++
.../sysds/runtime/util/Py4jConverterUtils.java | 114 +++++++++---------
src/main/python/systemds/utils/converters.py | 133 +++++++++++++--------
.../tests/iotests/test_io_pandas_systemds.py | 58 +++++----
.../test/component/frame/FrameGetSetTest.java | 43 +++++++
.../frame/array/Py4jConverterUtilsTest.java | 28 ++++-
8 files changed, 321 insertions(+), 129 deletions(-)
diff --git a/scripts/perftest/python/io/load_numpy.py
b/scripts/perftest/python/io/load_numpy.py
index 11622b3722..3395bbffc5 100644
--- a/scripts/perftest/python/io/load_numpy.py
+++ b/scripts/perftest/python/io/load_numpy.py
@@ -28,7 +28,12 @@ setup = "\n".join(
[
"from systemds.script_building.script import DMLScript",
"import numpy as np",
- "array = np.loadtxt(src, delimiter=',')",
+ "import os",
+ "if os.path.isdir(src):",
+ " files = [os.path.join(src, f) for f in os.listdir(src)]",
+ " array = np.concatenate([np.loadtxt(f, delimiter=',') for f in
files])",
+ "else:",
+ " array = np.loadtxt(src, delimiter=',')",
"if dtype is not None:",
" array = array.astype(dtype)",
]
diff --git a/scripts/perftest/python/io/load_pandas.py
b/scripts/perftest/python/io/load_pandas.py
index 60cb46f0cc..c6bd4e182f 100644
--- a/scripts/perftest/python/io/load_pandas.py
+++ b/scripts/perftest/python/io/load_pandas.py
@@ -27,7 +27,12 @@ setup = "\n".join(
[
"from systemds.script_building.script import DMLScript",
"import pandas as pd",
- "df = pd.read_csv(src, header=None)",
+ "import os",
+ "if os.path.isdir(src):",
+ " files = [os.path.join(src, f) for f in os.listdir(src)]",
+ " df = pd.concat([pd.read_csv(f, header=None) for f in files])",
+ "else:",
+ " df = pd.read_csv(src, header=None)",
"if dtype is not None:",
" df = df.astype(dtype)",
]
diff --git a/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java
b/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java
index f8c90fcdee..63cadb43cf 100644
--- a/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java
@@ -555,6 +555,17 @@ public class FrameBlock implements CacheBlock<FrameBlock>,
Externalizable {
reset(0, true);
}
+ /**
+ * Sets row at position r to the input array of objects, corresponding
to the schema.
+ * @param r row index
+ * @param row array of objects
+ */
+ public void setRow(int r, Object[] row) {
+ for (int i = 0; i < row.length; i++) {
+ set(r, i, row[i]);
+ }
+ }
+
/**
* Append a row to the end of the data frame, where all row fields are
boxed objects according to the schema.
*
@@ -753,6 +764,55 @@ public class FrameBlock implements CacheBlock<FrameBlock>,
Externalizable {
_msize = -1;
}
+ /**
+ * Appends a chunk of data to the end of a specified column.
+ *
+ * @param c column index
+ * @param chunk chunk of data to append
+ */
+ public void appendColumnChunk(int c, Array<?> chunk) {
+ if (_coldata == null) {
+ _coldata = new Array[getNumColumns()];
+ }
+
+ if (_coldata[c] == null) {
+ _coldata[c] = chunk;
+ _nRow = chunk.size();
+ } else {
+ _coldata[c] = ArrayFactory.append(_coldata[c], chunk);
+ _nRow += chunk.size();
+ }
+
+ _msize = -1;
+ }
+
+ /**
+ * Sets a chunk of data to a specified column, starting at the
specified offset.
+ *
+ * @param c column index
+ * @param chunk chunk of data to set
+ * @param offset offset position where it should set the chunk
+ * @param colSize size of columns, in case columns aren't initialized
yet
+ */
+ public void setColumnChunk(int c, Array<?> chunk, int offset, int
colSize) {
+ if (_coldata == null) {
+ _coldata = new Array[getNumColumns()];
+ _nRow = colSize;
+ }
+
+ if (_coldata[c] == null) {
+ _coldata[c] =
ArrayFactory.allocate(chunk.getValueType(), _nRow);
+ }
+
+ if (_coldata[c].getValueType() != chunk.getValueType()) {
+ throw new DMLRuntimeException("ValueType mismatch in
setColumnChunk: expected " +
+ _coldata[c].getValueType() + " but got
" + chunk.getValueType());
+ }
+
+ ArrayFactory.set(_coldata[c], chunk, offset, offset +
chunk.size() - 1, _nRow);
+ }
+
+
@Override
public void write(DataOutput out) throws IOException {
final boolean isDefaultMeta = isColNamesDefault() &&
isColumnMetadataDefault();
diff --git
a/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java
b/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java
index 75c03b31a1..7faee722d0 100644
--- a/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java
@@ -128,36 +128,60 @@ public class Py4jConverterUtils {
buffer.order(ByteOrder.LITTLE_ENDIAN);
Array<?> array = ArrayFactory.allocate(valueType, numElements);
+ readBufferIntoArray(buffer, array, valueType, numElements);
- // Process the data based on the value type
- switch(valueType) {
- case UINT8:
- for(int i = 0; i < numElements; i++) {
+ return array;
+ }
+
+ // Right now row conversion is only supported for if all columns have
the same datatype, so this is a placeholder for now that essentially just casts
to Object[]
+ public static Object[] convertRow(byte[] data, int numElements,
Types.ValueType valueType) {
+ Array<?> converted = convert(data, numElements, valueType);
+
+ Object[] row = new Object[numElements];
+ for(int i = 0; i < numElements; i++) {
+ row[i] = converted.get(i);
+ }
+
+ return row;
+ }
+
+ public static Array<?>[] convertFused(byte[] data, int numElements,
Types.ValueType[] valueTypes) {
+ int numOperations = valueTypes.length;
+
+ ByteBuffer buffer = ByteBuffer.wrap(data);
+ buffer.order(ByteOrder.LITTLE_ENDIAN);
+
+ Array<?>[] arrays = new Array<?>[numOperations];
+
+ for (int i = 0; i < numOperations; i++) {
+ arrays[i] = ArrayFactory.allocate(valueTypes[i],
numElements);
+ readBufferIntoArray(buffer, arrays[i], valueTypes[i],
numElements);
+ }
+
+ return arrays;
+ }
+
+ private static void readBufferIntoArray(ByteBuffer buffer, Array<?>
array, Types.ValueType valueType, int numElements) {
+ for (int i = 0; i < numElements; i++) {
+ switch (valueType) {
+ case UINT8:
array.set(i, (int) (buffer.get() &
0xFF));
- }
- break;
- case INT32:
- for(int i = 0; i < numElements; i++) {
- array.set(i, buffer.getInt());
- }
- break;
- case INT64:
- for(int i = 0; i < numElements; i++) {
- array.set(i, buffer.getLong());
- }
- break;
- case FP32:
- for(int i = 0; i < numElements; i++) {
+ break;
+ case INT32:
+ case HASH32:
+ array.set(i, buffer.getInt());
+ break;
+ case INT64:
+ case HASH64:
+ array.set(i, buffer.getLong());
+ break;
+ case FP32:
array.set(i, buffer.getFloat());
- }
- break;
- case FP64:
- for(int i = 0; i < numElements; i++) {
+ break;
+ case FP64:
array.set(i, buffer.getDouble());
- }
- break;
- case BOOLEAN:
- for(int i = 0; i < numElements; i++) {
+ break;
+ case BOOLEAN:
if (array instanceof BooleanArray) {
((BooleanArray) array).set(i,
buffer.get() != 0);
} else if (array instanceof
BitSetArray) {
@@ -165,38 +189,20 @@ public class Py4jConverterUtils {
} else {
throw new
DMLRuntimeException("Array factory returned invalid array type for boolean
values.");
}
- }
- break;
- case STRING:
- for(int i = 0; i < numElements; i++) {
- buffer.order(ByteOrder.BIG_ENDIAN);
- int strLen = buffer.getInt();
- buffer.order(ByteOrder.LITTLE_ENDIAN);
- byte[] strBytes = new byte[strLen];
+ break;
+ case STRING:
+ int strLength = buffer.getInt();
+ byte[] strBytes = new byte[strLength];
buffer.get(strBytes);
array.set(i, new String(strBytes,
StandardCharsets.UTF_8));
- }
- break;
- case CHARACTER:
- for(int i = 0; i < numElements; i++) {
+ break;
+ case CHARACTER:
array.set(i, buffer.getChar());
- }
- break;
- case HASH32:
- for(int i = 0; i < numElements; i++) {
- array.set(i, buffer.getInt());
- }
- break;
- case HASH64:
- for(int i = 0; i < numElements; i++) {
- array.set(i, buffer.getLong());
- }
- break;
- default:
- throw new DMLRuntimeException("Unsupported
value type: " + valueType);
+ break;
+ default:
+ throw new
DMLRuntimeException("Unsupported value type: " + valueType);
+ }
}
-
- return array;
}
public static byte[] convertMBtoPy4JDenseArr(MatrixBlock mb) {
diff --git a/src/main/python/systemds/utils/converters.py
b/src/main/python/systemds/utils/converters.py
index 38fdab8ca7..61a4769e80 100644
--- a/src/main/python/systemds/utils/converters.py
+++ b/src/main/python/systemds/utils/converters.py
@@ -56,7 +56,7 @@ def numpy_to_matrix_block(sds, np_arr: np.array):
else:
arr = np_arr.ravel().astype(np.float64)
value_type = jvm.org.apache.sysds.common.Types.ValueType.FP64
- buf = bytearray(arr.tobytes())
+ buf = arr.tobytes()
# Send data to java.
try:
@@ -82,31 +82,38 @@ def matrix_block_to_numpy(jvm: JVMView, mb: JavaObject):
)
-def convert_column(jvm, rows, j, col_type, pd_col, fb, col_name):
- """Converts a given pandas column to a FrameBlock representation.
+def convert(jvm, fb, idx, num_elements, value_type, pd_series,
conversion="column"):
+ """Converts a given pandas column or row to a FrameBlock representation.
:param jvm: The JVMView of the current SystemDS context.
- :param rows: The number of rows in the pandas DataFrame.
- :param j: The current column index.
- :param col_type: The ValueType of the column.
- :param pd_col: The pandas column to convert.
+ :param fb: The FrameBlock to add the column to.
+ :param idx: The current column/row index.
+ :param num_elements: The number of rows/columns in the pandas DataFrame.
+ :param value_type: The ValueType of the column/row.
+ :param pd_series: The pandas column or row to convert.
+ :param conversion: The type of conversion to perform. Can be either
"column" or "row".
"""
- if col_type == jvm.org.apache.sysds.common.Types.ValueType.STRING:
+ if pd_series.dtype == "string" or pd_series.dtype == "object":
byte_data = bytearray()
- for value in pd_col.astype(str):
+ for value in pd_series.astype(str):
encoded_value = value.encode("utf-8")
- byte_data.extend(struct.pack(">I", len(encoded_value)))
+ byte_data.extend(struct.pack("<I", len(encoded_value)))
byte_data.extend(encoded_value)
else:
- col_data = pd_col.fillna("").to_numpy()
- byte_data = bytearray(col_data.tobytes())
-
- converted_array =
jvm.org.apache.sysds.runtime.util.Py4jConverterUtils.convert(
- byte_data, rows, col_type
- )
-
- fb.setColumnName(j, str(col_name))
- fb.setColumn(j, converted_array)
+ byte_data = pd_series.fillna("").to_numpy().tobytes()
+
+ if conversion == "column":
+ converted_array =
jvm.org.apache.sysds.runtime.util.Py4jConverterUtils.convert(
+ byte_data, num_elements, value_type
+ )
+ fb.setColumn(idx, converted_array)
+ elif conversion == "row":
+ converted_array = (
+ jvm.org.apache.sysds.runtime.util.Py4jConverterUtils.convertRow(
+ byte_data, num_elements, value_type
+ )
+ )
+ fb.setRow(idx, converted_array)
def pandas_to_frame_block(sds, pd_df: pd.DataFrame):
@@ -121,58 +128,82 @@ def pandas_to_frame_block(sds, pd_df: pd.DataFrame):
jvm: JVMView = sds.java_gateway.jvm
java_gate: JavaGateway = sds.java_gateway
+ jc_ValueType = jvm.org.apache.sysds.common.Types.ValueType
# pandas type mapping to systemds Valuetypes
data_type_mapping = {
- np.dtype(np.object_):
jvm.org.apache.sysds.common.Types.ValueType.STRING,
- np.dtype(np.int64): jvm.org.apache.sysds.common.Types.ValueType.INT64,
- np.dtype(np.float64): jvm.org.apache.sysds.common.Types.ValueType.FP64,
- np.dtype(np.bool_):
jvm.org.apache.sysds.common.Types.ValueType.BOOLEAN,
- np.dtype("<M8[ns]"):
jvm.org.apache.sysds.common.Types.ValueType.STRING,
- np.dtype(np.int32): jvm.org.apache.sysds.common.Types.ValueType.INT32,
- np.dtype(np.float32): jvm.org.apache.sysds.common.Types.ValueType.FP32,
- np.dtype(np.uint8): jvm.org.apache.sysds.common.Types.ValueType.UINT8,
- np.dtype(np.str_):
jvm.org.apache.sysds.common.Types.ValueType.CHARACTER,
+ "object": jc_ValueType.STRING,
+ "int64": jc_ValueType.INT64,
+ "float64": jc_ValueType.FP64,
+ "bool": jc_ValueType.BOOLEAN,
+ "string": jc_ValueType.STRING,
+ "int32": jc_ValueType.INT32,
+ "float32": jc_ValueType.FP32,
+ "uint8": jc_ValueType.UINT8,
}
- schema = []
- col_names = []
- for col_name, dtype in dict(pd_df.dtypes).items():
+ # schema and j_valueTypeArray are essentially doubled but accessing a Java
array is costly,
+ # while also being necessary for FrameBlock, so we create one for Python
and one for Java.
+ col_names = []
+ schema = []
+ j_valueTypeArray = java_gate.new_array(jc_ValueType, cols)
+ j_colNameArray = java_gate.new_array(jvm.java.lang.String, cols)
+ for i, (col_name, dtype) in enumerate(dict(pd_df.dtypes).items()):
+ j_colNameArray[i] = str(col_name)
col_names.append(col_name)
- if dtype in data_type_mapping.keys():
- schema.append(data_type_mapping[dtype])
+ type_key = str(dtype)
+ if type_key in data_type_mapping:
+ schema.append(data_type_mapping[type_key])
+ j_valueTypeArray[i] = data_type_mapping[type_key]
else:
- schema.append(jvm.org.apache.sysds.common.Types.ValueType.STRING)
+ schema.append(jc_ValueType.STRING)
+ j_valueTypeArray[i] = jc_ValueType.STRING
+
try:
- jc_ValueType = jvm.org.apache.sysds.common.Types.ValueType
jc_String = jvm.java.lang.String
jc_FrameBlock = jvm.org.apache.sysds.runtime.frame.data.FrameBlock
- j_valueTypeArray = java_gate.new_array(jc_ValueType, len(schema))
-
# execution speed increases with optimized code when the number of
rows exceeds 4
if rows > 4:
- for i in range(len(schema)):
- j_valueTypeArray[i] = schema[i]
-
- fb = jc_FrameBlock(j_valueTypeArray, rows)
+ # Row conversion if more columns than rows and all columns have
the same type, otherwise column
+ conversion_type = (
+ "row" if cols > rows and len(set(pd_df.dtypes)) == 1 else
"column"
+ )
+ if conversion_type == "row":
+ pd_df = pd_df.transpose()
+ col_names = pd_df.columns.tolist() # re-calculate col names
+
+ fb = jc_FrameBlock(
+ j_valueTypeArray,
+ j_colNameArray,
+ rows if conversion_type == "column" else None,
+ )
+ if conversion_type == "row":
+ fb.ensureAllocatedColumns(rows)
+ # We use .submit() with explicit .result() calling to properly
propagate exceptions
with concurrent.futures.ThreadPoolExecutor() as executor:
- executor.map(
- lambda j, col_name: convert_column(
- jvm, rows, j, schema[j], pd_df[col_name], fb, col_name
- ),
- range(len(col_names)),
- col_names,
- )
+ futures = [
+ executor.submit(
+ convert,
+ jvm,
+ fb,
+ i,
+ rows if conversion_type == "column" else cols,
+ schema[i],
+ pd_df[col_name],
+ conversion_type,
+ )
+ for i, col_name in enumerate(col_names)
+ ]
+
+ for future in concurrent.futures.as_completed(futures):
+ future.result()
return fb
else:
j_dataArray = java_gate.new_array(jc_String, rows, cols)
- j_colNameArray = java_gate.new_array(jc_String, len(col_names))
for j, col_name in enumerate(col_names):
- j_valueTypeArray[j] = schema[j]
- j_colNameArray[j] = str(col_names[j])
col_data = pd_df[col_name].fillna("").to_numpy(dtype=str)
for i in range(col_data.shape[0]):
diff --git a/src/main/python/tests/iotests/test_io_pandas_systemds.py
b/src/main/python/tests/iotests/test_io_pandas_systemds.py
index 0ddbf63a5d..7f599ea163 100644
--- a/src/main/python/tests/iotests/test_io_pandas_systemds.py
+++ b/src/main/python/tests/iotests/test_io_pandas_systemds.py
@@ -27,18 +27,22 @@ import pandas as pd
from systemds.context import SystemDSContext
+def create_dataframe(n_rows, n_cols, mixed=True):
+ return pd.DataFrame(
+ {
+ f"C{i+1}": [
+ f"col{i+1}_string_{j}" if i == 0 and mixed else j + i
+ for j in range(n_rows)
+ ]
+ for i in range(n_cols)
+ }
+ )
+
+
class TestPandasFromToSystemds(unittest.TestCase):
sds: SystemDSContext = None
temp_dir: str = "tests/iotests/temp_write_csv/"
- n_cols = 3
- n_rows = 5
- df = pd.DataFrame(
- {
- "C1": [f"col1_string_{i}" for i in range(n_rows)],
- "C2": [i for i in range(n_rows)],
- }
- )
@classmethod
def setUpClass(cls):
@@ -52,22 +56,36 @@ class TestPandasFromToSystemds(unittest.TestCase):
shutil.rmtree(cls.temp_dir, ignore_errors=True)
def test_into_systemds(self):
- # Transfer into SystemDS and write to CSV
- frame = self.sds.from_pandas(self.df)
- frame.write(
- self.temp_dir + "into_systemds.csv", format="csv", header=True
- ).compute(verbose=True)
+ combinations = [ # (n_rows, n_cols, mixed)
+ (3, 2, True), # Test un-parallelized code (rows <= 4)
+ (10, 5, True), # Test parallelized column-wise code
+ (5, 10, True), # Test parallelized column-wise mixed code
+ (5, 10, False), # Test parallelized row-wise code
+ ]
- # Read the CSV file using pandas
- result_df = pd.read_csv(self.temp_dir + "into_systemds.csv")
+ for n_rows, n_cols, mixed in combinations:
+ df = create_dataframe(n_rows, n_cols, mixed)
- # Verify the data
- self.assertTrue(isinstance(result_df, pd.DataFrame))
- self.assertTrue(self.df.equals(result_df))
+ # Transfer into SystemDS and write to CSV
+ frame = self.sds.from_pandas(df)
+ frame.write(
+ self.temp_dir + "into_systemds.csv", format="csv", header=True
+ ).compute(verbose=True)
+
+ # Read the CSV file using pandas
+ result_df = pd.read_csv(self.temp_dir + "into_systemds.csv")
+
+ # Verify the data
+ self.assertTrue(isinstance(result_df, pd.DataFrame))
+ self.assertTrue(df.equals(result_df))
def test_out_of_systemds(self):
+ n_rows = 3
+ n_cols = 2
+ df = create_dataframe(n_rows, n_cols)
+
# Create a CSV file to read into SystemDS
- self.df.to_csv(self.temp_dir + "out_of_systemds.csv", header=False,
index=False)
+ df.to_csv(self.temp_dir + "out_of_systemds.csv", header=False,
index=False)
# Read the CSV file into SystemDS and then compute back to pandas
frame = self.sds.read(
@@ -79,7 +97,7 @@ class TestPandasFromToSystemds(unittest.TestCase):
result_df["C2"] = result_df["C2"].astype(int)
self.assertTrue(isinstance(result_df, pd.DataFrame))
- self.assertTrue(self.df.equals(result_df))
+ self.assertTrue(df.equals(result_df))
if __name__ == "__main__":
diff --git
a/src/test/java/org/apache/sysds/test/component/frame/FrameGetSetTest.java
b/src/test/java/org/apache/sysds/test/component/frame/FrameGetSetTest.java
index 63f617711a..b5c57341e0 100644
--- a/src/test/java/org/apache/sysds/test/component/frame/FrameGetSetTest.java
+++ b/src/test/java/org/apache/sysds/test/component/frame/FrameGetSetTest.java
@@ -19,6 +19,10 @@
package org.apache.sysds.test.component.frame;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.sysds.runtime.frame.data.columns.Array;
+import org.apache.sysds.runtime.frame.data.columns.ArrayFactory;
import org.junit.Assert;
import org.junit.Test;
import org.apache.sysds.common.Types.ValueType;
@@ -171,4 +175,43 @@ public class FrameGetSetTest extends AutomatedTestBase
throw new RuntimeException(ex);
}
}
+
+
+ @Test
+ public void testSetRow() {
+ FrameBlock frame = new FrameBlock(schemaMixed, "0", rows);
+
+ frame.setRow(2, new Object[] {"2", 2.0, 2L, true});
+
+ assertEquals(frame.get(2, 0), "2");
+ assertEquals(frame.get(2, 1), 2.0);
+ assertEquals(frame.get(2, 2), 2L);
+ assertEquals(frame.get(2, 3), true);
+ }
+
+ @Test
+ public void testAppendColumnChunk() {
+ FrameBlock frame = new FrameBlock(schemaMixed, rows);
+
+ Array<?> chunk = ArrayFactory.create(new double[] {1.0, 2.0});
+ Array<?> chunk2 = ArrayFactory.create(new double[] {3.0, 4.0});
+ frame.appendColumnChunk(1, chunk);
+ frame.appendColumnChunk(1, chunk2);
+
+ assertEquals(frame.get(0, 1), 1.0);
+ assertEquals(frame.get(1, 1), 2.0);
+ assertEquals(frame.get(2, 1), 3.0);
+ assertEquals(frame.get(3, 1), 4.0);
+ }
+
+ @Test
+ public void testSetColumnChunk() {
+ FrameBlock frame = new FrameBlock(schemaMixed, "0", rows);
+
+ Array<?> chunk = ArrayFactory.create(new double[] {1.0, 2.0});
+ frame.setColumnChunk(1, chunk, 5, rows);
+
+ assertEquals(frame.get(5, 1), 1.0);
+ assertEquals(frame.get(6, 1), 2.0);
+ }
}
diff --git
a/src/test/java/org/apache/sysds/test/component/frame/array/Py4jConverterUtilsTest.java
b/src/test/java/org/apache/sysds/test/component/frame/array/Py4jConverterUtilsTest.java
index 980165c3ab..466c3337d8 100644
---
a/src/test/java/org/apache/sysds/test/component/frame/array/Py4jConverterUtilsTest.java
+++
b/src/test/java/org/apache/sysds/test/component/frame/array/Py4jConverterUtilsTest.java
@@ -170,9 +170,7 @@ public class Py4jConverterUtilsTest {
ByteBuffer buffer = ByteBuffer.allocate(4 + strings[0].length()
+ 4 + strings[1].length());
buffer.order(ByteOrder.LITTLE_ENDIAN);
for(String s : strings) {
- buffer.order(ByteOrder.BIG_ENDIAN);
buffer.putInt(s.length());
- buffer.order(ByteOrder.LITTLE_ENDIAN);
buffer.put(s.getBytes(StandardCharsets.UTF_8));
}
Array<?> result = Py4jConverterUtils.convert(buffer.array(),
numElements, Types.ValueType.STRING);
@@ -199,6 +197,32 @@ public class Py4jConverterUtilsTest {
}
}
+ @Test
+ public void testConvertRow() {
+ int numElements = 4;
+ byte[] data = {1, 2, 3, 4};
+ Object[] row = Py4jConverterUtils.convertRow(data, numElements,
Types.ValueType.UINT8);
+ assertNotNull(row);
+ assertEquals(4, row.length);
+ assertEquals(1, row[0]);
+ assertEquals(2, row[1]);
+ assertEquals(3, row[2]);
+ assertEquals(4, row[3]);
+ }
+
+ @Test
+ public void testConvertFused() {
+ int numElements = 1;
+ byte[] data = {1, 2, 3, 4};
+ Types.ValueType[] valueTypes = {ValueType.UINT8,
ValueType.UINT8, ValueType.UINT8, ValueType.UINT8};
+ Array<?>[] arrays = Py4jConverterUtils.convertFused(data,
numElements, valueTypes);
+ assertNotNull(arrays);
+ assertEquals(4, arrays.length);
+ for(int i = 0; i < 4; i++) {
+ assertEquals(1 + i, arrays[i].get(0));
+ }
+ }
+
@Test(expected = Exception.class)
public void nullData() {
Py4jConverterUtils.convert(null, 14, ValueType.BOOLEAN);