This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new bea9c96fc1 [SYSTEMDS-3548] Optimize python dataframe transfer
bea9c96fc1 is described below

commit bea9c96fc1acae107498982724e4a739c8c98d74
Author: Nakroma <[email protected]>
AuthorDate: Wed Feb 5 22:38:35 2025 +0100

    [SYSTEMDS-3548] Optimize python dataframe transfer
    
    This commit optimizes how the pandas_to_frame_block function accesses Java 
types.
    It also fixes a small regression, where exceptions from the parallelization 
threads weren't propagating exceptions properly.
    
    - Fix perftests not working with large, split-up datasets IO datagen splits 
large datasets into multiple files (for example 100k_1k). This commit makes 
load_pandas.py and load_numpy.py able to read those.
    - Add pandas to FrameBlock row-wise parallel processing in the case of cols 
> rows. It also adds some other small, unused utility methods.
    - Add javadocs
    - Adjust Py4jConverterUtilsTest to reflect the code changes in the main 
class.
    - adds missing tests for added code in SYSTEMDS-3548. This includes the 
FrameBlock and Py4jConverterUtils functions, as well as python pandas to 
systemds io e2e tests.
    - Fix pandas io test (rows have to be >4)
    
    Closes #2189
---
 scripts/perftest/python/io/load_numpy.py           |   7 +-
 scripts/perftest/python/io/load_pandas.py          |   7 +-
 .../sysds/runtime/frame/data/FrameBlock.java       |  60 ++++++++++
 .../sysds/runtime/util/Py4jConverterUtils.java     | 114 +++++++++---------
 src/main/python/systemds/utils/converters.py       | 133 +++++++++++++--------
 .../tests/iotests/test_io_pandas_systemds.py       |  58 +++++----
 .../test/component/frame/FrameGetSetTest.java      |  43 +++++++
 .../frame/array/Py4jConverterUtilsTest.java        |  28 ++++-
 8 files changed, 321 insertions(+), 129 deletions(-)

diff --git a/scripts/perftest/python/io/load_numpy.py 
b/scripts/perftest/python/io/load_numpy.py
index 11622b3722..3395bbffc5 100644
--- a/scripts/perftest/python/io/load_numpy.py
+++ b/scripts/perftest/python/io/load_numpy.py
@@ -28,7 +28,12 @@ setup = "\n".join(
     [
         "from systemds.script_building.script import DMLScript",
         "import numpy as np",
-        "array = np.loadtxt(src, delimiter=',')",
+        "import os",
+        "if os.path.isdir(src):",
+        "    files = [os.path.join(src, f) for f in os.listdir(src)]",
+        "    array = np.concatenate([np.loadtxt(f, delimiter=',') for f in 
files])",
+        "else:",
+        "    array = np.loadtxt(src, delimiter=',')",
         "if dtype is not None:",
         "    array = array.astype(dtype)",
     ]
diff --git a/scripts/perftest/python/io/load_pandas.py 
b/scripts/perftest/python/io/load_pandas.py
index 60cb46f0cc..c6bd4e182f 100644
--- a/scripts/perftest/python/io/load_pandas.py
+++ b/scripts/perftest/python/io/load_pandas.py
@@ -27,7 +27,12 @@ setup = "\n".join(
     [
         "from systemds.script_building.script import DMLScript",
         "import pandas as pd",
-        "df = pd.read_csv(src, header=None)",
+        "import os",
+        "if os.path.isdir(src):",
+        "    files = [os.path.join(src, f) for f in os.listdir(src)]",
+        "    df = pd.concat([pd.read_csv(f, header=None) for f in files])",
+        "else:",
+        "    df = pd.read_csv(src, header=None)",
         "if dtype is not None:",
         "    df = df.astype(dtype)",
     ]
diff --git a/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java
index f8c90fcdee..63cadb43cf 100644
--- a/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java
@@ -555,6 +555,17 @@ public class FrameBlock implements CacheBlock<FrameBlock>, 
Externalizable {
                reset(0, true);
        }
 
+       /**
+        * Sets row at position r to the input array of objects, corresponding 
to the schema.
+        * @param r       row index
+        * @param row array of objects
+        */
+       public void setRow(int r, Object[] row) {
+               for (int i = 0; i < row.length; i++) {
+                       set(r, i, row[i]);
+               }
+       }
+
        /**
         * Append a row to the end of the data frame, where all row fields are 
boxed objects according to the schema.
         *
@@ -753,6 +764,55 @@ public class FrameBlock implements CacheBlock<FrameBlock>, 
Externalizable {
                _msize = -1;
        }
 
+       /**
+        * Appends a chunk of data to the end of a specified column.
+        * 
+        * @param c     column index
+        * @param chunk chunk of data to append
+        */
+       public void appendColumnChunk(int c, Array<?> chunk) {
+               if (_coldata == null) {
+                       _coldata = new Array[getNumColumns()];
+               }
+
+               if (_coldata[c] == null) {
+                       _coldata[c] = chunk;
+                       _nRow = chunk.size();
+               } else {
+                       _coldata[c] = ArrayFactory.append(_coldata[c], chunk);
+                       _nRow += chunk.size();
+               }
+
+               _msize = -1;
+       }
+
+       /**
+        * Sets a chunk of data to a specified column, starting at the 
specified offset.
+        * 
+        * @param c               column index
+        * @param chunk   chunk of data to set
+        * @param offset  offset position where it should set the chunk
+        * @param colSize size of columns, in case columns aren't initialized 
yet
+        */
+       public void setColumnChunk(int c, Array<?> chunk, int offset, int 
colSize) {
+               if (_coldata == null) {
+                       _coldata = new Array[getNumColumns()];
+                       _nRow = colSize;
+               }
+
+               if (_coldata[c] == null) {
+                       _coldata[c] = 
ArrayFactory.allocate(chunk.getValueType(), _nRow);
+               }
+
+               if (_coldata[c].getValueType() != chunk.getValueType()) {
+                       throw new DMLRuntimeException("ValueType mismatch in 
setColumnChunk: expected " +
+                                       _coldata[c].getValueType() + " but got 
" + chunk.getValueType());
+               }
+
+               ArrayFactory.set(_coldata[c], chunk, offset, offset + 
chunk.size() - 1, _nRow);
+       }
+
+
        @Override
        public void write(DataOutput out) throws IOException {
                final boolean isDefaultMeta = isColNamesDefault() && 
isColumnMetadataDefault();
diff --git 
a/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java 
b/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java
index 75c03b31a1..7faee722d0 100644
--- a/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java
@@ -128,36 +128,60 @@ public class Py4jConverterUtils {
                buffer.order(ByteOrder.LITTLE_ENDIAN);
 
                Array<?> array = ArrayFactory.allocate(valueType, numElements);
+               readBufferIntoArray(buffer, array, valueType, numElements);
 
-               // Process the data based on the value type
-               switch(valueType) {
-                       case UINT8:
-                               for(int i = 0; i < numElements; i++) {
+               return array;
+       }
+
+       // Right now row conversion is only supported for if all columns have 
the same datatype, so this is a placeholder for now that essentially just casts 
to Object[]
+       public static Object[] convertRow(byte[] data, int numElements, 
Types.ValueType valueType) {
+               Array<?> converted = convert(data, numElements, valueType);
+
+               Object[] row = new Object[numElements];
+               for(int i = 0; i < numElements; i++) {
+                       row[i] = converted.get(i);
+               }
+
+               return row;
+       }
+
+       public static Array<?>[] convertFused(byte[] data, int numElements, 
Types.ValueType[] valueTypes) {
+               int numOperations = valueTypes.length;
+
+               ByteBuffer buffer = ByteBuffer.wrap(data);
+               buffer.order(ByteOrder.LITTLE_ENDIAN);
+
+               Array<?>[] arrays = new Array<?>[numOperations];
+
+               for (int i = 0; i < numOperations; i++) {
+                       arrays[i] = ArrayFactory.allocate(valueTypes[i], 
numElements);
+                       readBufferIntoArray(buffer, arrays[i], valueTypes[i], 
numElements);
+               }
+
+        return arrays;
+    }
+
+       private static void readBufferIntoArray(ByteBuffer buffer, Array<?> 
array, Types.ValueType valueType, int numElements) {
+               for (int i = 0; i < numElements; i++) {
+                       switch (valueType) {
+                               case UINT8:
                                        array.set(i, (int) (buffer.get() & 
0xFF));
-                               }
-                               break;
-                       case INT32:
-                               for(int i = 0; i < numElements; i++) {
-                                       array.set(i, buffer.getInt());
-                               }
-                               break;
-                       case INT64:
-                               for(int i = 0; i < numElements; i++) {
-                                       array.set(i, buffer.getLong());
-                               }
-                               break;
-                       case FP32:
-                               for(int i = 0; i < numElements; i++) {
+                                       break;
+                               case INT32:
+                case HASH32:
+                    array.set(i, buffer.getInt());
+                                       break;
+                               case INT64:
+                case HASH64:
+                    array.set(i, buffer.getLong());
+                                       break;
+                               case FP32:
                                        array.set(i, buffer.getFloat());
-                               }
-                               break;
-                       case FP64:
-                               for(int i = 0; i < numElements; i++) {
+                                       break;
+                               case FP64:
                                        array.set(i, buffer.getDouble());
-                               }
-                               break;
-                       case BOOLEAN:
-                               for(int i = 0; i < numElements; i++) {
+                                       break;
+                               case BOOLEAN:
                                        if (array instanceof BooleanArray) {
                                                ((BooleanArray) array).set(i, 
buffer.get() != 0);
                                        } else if (array instanceof 
BitSetArray) {
@@ -165,38 +189,20 @@ public class Py4jConverterUtils {
                                        } else {
                                                throw new 
DMLRuntimeException("Array factory returned invalid array type for boolean 
values.");
                                        }
-                               }
-                               break;
-                       case STRING:
-                               for(int i = 0; i < numElements; i++) {
-                                       buffer.order(ByteOrder.BIG_ENDIAN);
-                                       int strLen = buffer.getInt();
-                                       buffer.order(ByteOrder.LITTLE_ENDIAN);
-                                       byte[] strBytes = new byte[strLen];
+                                       break;
+                               case STRING:
+                                       int strLength = buffer.getInt();
+                                       byte[] strBytes = new byte[strLength];
                                        buffer.get(strBytes);
                                        array.set(i, new String(strBytes, 
StandardCharsets.UTF_8));
-                               }
-                               break;
-                       case CHARACTER:
-                               for(int i = 0; i < numElements; i++) {
+                                       break;
+                               case CHARACTER:
                                        array.set(i, buffer.getChar());
-                               }
-                               break;
-                       case HASH32:
-                               for(int i = 0; i < numElements; i++) {
-                                       array.set(i, buffer.getInt());
-                               }
-                               break;
-                       case HASH64:
-                               for(int i = 0; i < numElements; i++) {
-                                       array.set(i, buffer.getLong());
-                               }
-                               break;
-                       default:
-                               throw new DMLRuntimeException("Unsupported 
value type: " + valueType);
+                                       break;
+                default:
+                                       throw new 
DMLRuntimeException("Unsupported value type: " + valueType);
+                       }
                }
-
-               return array;
        }
 
        public static byte[] convertMBtoPy4JDenseArr(MatrixBlock mb) {
diff --git a/src/main/python/systemds/utils/converters.py 
b/src/main/python/systemds/utils/converters.py
index 38fdab8ca7..61a4769e80 100644
--- a/src/main/python/systemds/utils/converters.py
+++ b/src/main/python/systemds/utils/converters.py
@@ -56,7 +56,7 @@ def numpy_to_matrix_block(sds, np_arr: np.array):
     else:
         arr = np_arr.ravel().astype(np.float64)
         value_type = jvm.org.apache.sysds.common.Types.ValueType.FP64
-    buf = bytearray(arr.tobytes())
+    buf = arr.tobytes()
 
     # Send data to java.
     try:
@@ -82,31 +82,38 @@ def matrix_block_to_numpy(jvm: JVMView, mb: JavaObject):
     )
 
 
-def convert_column(jvm, rows, j, col_type, pd_col, fb, col_name):
-    """Converts a given pandas column to a FrameBlock representation.
+def convert(jvm, fb, idx, num_elements, value_type, pd_series, 
conversion="column"):
+    """Converts a given pandas column or row to a FrameBlock representation.
 
     :param jvm: The JVMView of the current SystemDS context.
-    :param rows: The number of rows in the pandas DataFrame.
-    :param j: The current column index.
-    :param col_type: The ValueType of the column.
-    :param pd_col: The pandas column to convert.
+    :param fb: The FrameBlock to add the column to.
+    :param idx: The current column/row index.
+    :param num_elements: The number of rows/columns in the pandas DataFrame.
+    :param value_type: The ValueType of the column/row.
+    :param pd_series: The pandas column or row to convert.
+    :param conversion: The type of conversion to perform. Can be either 
"column" or "row".
     """
-    if col_type == jvm.org.apache.sysds.common.Types.ValueType.STRING:
+    if pd_series.dtype == "string" or pd_series.dtype == "object":
         byte_data = bytearray()
-        for value in pd_col.astype(str):
+        for value in pd_series.astype(str):
             encoded_value = value.encode("utf-8")
-            byte_data.extend(struct.pack(">I", len(encoded_value)))
+            byte_data.extend(struct.pack("<I", len(encoded_value)))
             byte_data.extend(encoded_value)
     else:
-        col_data = pd_col.fillna("").to_numpy()
-        byte_data = bytearray(col_data.tobytes())
-
-    converted_array = 
jvm.org.apache.sysds.runtime.util.Py4jConverterUtils.convert(
-        byte_data, rows, col_type
-    )
-
-    fb.setColumnName(j, str(col_name))
-    fb.setColumn(j, converted_array)
+        byte_data = pd_series.fillna("").to_numpy().tobytes()
+
+    if conversion == "column":
+        converted_array = 
jvm.org.apache.sysds.runtime.util.Py4jConverterUtils.convert(
+            byte_data, num_elements, value_type
+        )
+        fb.setColumn(idx, converted_array)
+    elif conversion == "row":
+        converted_array = (
+            jvm.org.apache.sysds.runtime.util.Py4jConverterUtils.convertRow(
+                byte_data, num_elements, value_type
+            )
+        )
+        fb.setRow(idx, converted_array)
 
 
 def pandas_to_frame_block(sds, pd_df: pd.DataFrame):
@@ -121,58 +128,82 @@ def pandas_to_frame_block(sds, pd_df: pd.DataFrame):
 
     jvm: JVMView = sds.java_gateway.jvm
     java_gate: JavaGateway = sds.java_gateway
+    jc_ValueType = jvm.org.apache.sysds.common.Types.ValueType
 
     # pandas type mapping to systemds Valuetypes
     data_type_mapping = {
-        np.dtype(np.object_): 
jvm.org.apache.sysds.common.Types.ValueType.STRING,
-        np.dtype(np.int64): jvm.org.apache.sysds.common.Types.ValueType.INT64,
-        np.dtype(np.float64): jvm.org.apache.sysds.common.Types.ValueType.FP64,
-        np.dtype(np.bool_): 
jvm.org.apache.sysds.common.Types.ValueType.BOOLEAN,
-        np.dtype("<M8[ns]"): 
jvm.org.apache.sysds.common.Types.ValueType.STRING,
-        np.dtype(np.int32): jvm.org.apache.sysds.common.Types.ValueType.INT32,
-        np.dtype(np.float32): jvm.org.apache.sysds.common.Types.ValueType.FP32,
-        np.dtype(np.uint8): jvm.org.apache.sysds.common.Types.ValueType.UINT8,
-        np.dtype(np.str_): 
jvm.org.apache.sysds.common.Types.ValueType.CHARACTER,
+        "object": jc_ValueType.STRING,
+        "int64": jc_ValueType.INT64,
+        "float64": jc_ValueType.FP64,
+        "bool": jc_ValueType.BOOLEAN,
+        "string": jc_ValueType.STRING,
+        "int32": jc_ValueType.INT32,
+        "float32": jc_ValueType.FP32,
+        "uint8": jc_ValueType.UINT8,
     }
-    schema = []
-    col_names = []
 
-    for col_name, dtype in dict(pd_df.dtypes).items():
+    # schema and j_valueTypeArray are essentially doubled but accessing a Java 
array is costly,
+    # while also being necessary for FrameBlock, so we create one for Python 
and one for Java.
+    col_names = []
+    schema = []
+    j_valueTypeArray = java_gate.new_array(jc_ValueType, cols)
+    j_colNameArray = java_gate.new_array(jvm.java.lang.String, cols)
+    for i, (col_name, dtype) in enumerate(dict(pd_df.dtypes).items()):
+        j_colNameArray[i] = str(col_name)
         col_names.append(col_name)
-        if dtype in data_type_mapping.keys():
-            schema.append(data_type_mapping[dtype])
+        type_key = str(dtype)
+        if type_key in data_type_mapping:
+            schema.append(data_type_mapping[type_key])
+            j_valueTypeArray[i] = data_type_mapping[type_key]
         else:
-            schema.append(jvm.org.apache.sysds.common.Types.ValueType.STRING)
+            schema.append(jc_ValueType.STRING)
+            j_valueTypeArray[i] = jc_ValueType.STRING
+
     try:
-        jc_ValueType = jvm.org.apache.sysds.common.Types.ValueType
         jc_String = jvm.java.lang.String
         jc_FrameBlock = jvm.org.apache.sysds.runtime.frame.data.FrameBlock
-        j_valueTypeArray = java_gate.new_array(jc_ValueType, len(schema))
-
         # execution speed increases with optimized code when the number of 
rows exceeds 4
         if rows > 4:
-            for i in range(len(schema)):
-                j_valueTypeArray[i] = schema[i]
-
-            fb = jc_FrameBlock(j_valueTypeArray, rows)
+            # Row conversion if more columns than rows and all columns have 
the same type, otherwise column
+            conversion_type = (
+                "row" if cols > rows and len(set(pd_df.dtypes)) == 1 else 
"column"
+            )
+            if conversion_type == "row":
+                pd_df = pd_df.transpose()
+                col_names = pd_df.columns.tolist()  # re-calculate col names
+
+            fb = jc_FrameBlock(
+                j_valueTypeArray,
+                j_colNameArray,
+                rows if conversion_type == "column" else None,
+            )
+            if conversion_type == "row":
+                fb.ensureAllocatedColumns(rows)
 
+            # We use .submit() with explicit .result() calling to properly 
propagate exceptions
             with concurrent.futures.ThreadPoolExecutor() as executor:
-                executor.map(
-                    lambda j, col_name: convert_column(
-                        jvm, rows, j, schema[j], pd_df[col_name], fb, col_name
-                    ),
-                    range(len(col_names)),
-                    col_names,
-                )
+                futures = [
+                    executor.submit(
+                        convert,
+                        jvm,
+                        fb,
+                        i,
+                        rows if conversion_type == "column" else cols,
+                        schema[i],
+                        pd_df[col_name],
+                        conversion_type,
+                    )
+                    for i, col_name in enumerate(col_names)
+                ]
+
+                for future in concurrent.futures.as_completed(futures):
+                    future.result()
 
             return fb
         else:
             j_dataArray = java_gate.new_array(jc_String, rows, cols)
-            j_colNameArray = java_gate.new_array(jc_String, len(col_names))
 
             for j, col_name in enumerate(col_names):
-                j_valueTypeArray[j] = schema[j]
-                j_colNameArray[j] = str(col_names[j])
                 col_data = pd_df[col_name].fillna("").to_numpy(dtype=str)
 
                 for i in range(col_data.shape[0]):
diff --git a/src/main/python/tests/iotests/test_io_pandas_systemds.py 
b/src/main/python/tests/iotests/test_io_pandas_systemds.py
index 0ddbf63a5d..7f599ea163 100644
--- a/src/main/python/tests/iotests/test_io_pandas_systemds.py
+++ b/src/main/python/tests/iotests/test_io_pandas_systemds.py
@@ -27,18 +27,22 @@ import pandas as pd
 from systemds.context import SystemDSContext
 
 
+def create_dataframe(n_rows, n_cols, mixed=True):
+    return pd.DataFrame(
+        {
+            f"C{i+1}": [
+                f"col{i+1}_string_{j}" if i == 0 and mixed else j + i
+                for j in range(n_rows)
+            ]
+            for i in range(n_cols)
+        }
+    )
+
+
 class TestPandasFromToSystemds(unittest.TestCase):
 
     sds: SystemDSContext = None
     temp_dir: str = "tests/iotests/temp_write_csv/"
-    n_cols = 3
-    n_rows = 5
-    df = pd.DataFrame(
-        {
-            "C1": [f"col1_string_{i}" for i in range(n_rows)],
-            "C2": [i for i in range(n_rows)],
-        }
-    )
 
     @classmethod
     def setUpClass(cls):
@@ -52,22 +56,36 @@ class TestPandasFromToSystemds(unittest.TestCase):
         shutil.rmtree(cls.temp_dir, ignore_errors=True)
 
     def test_into_systemds(self):
-        # Transfer into SystemDS and write to CSV
-        frame = self.sds.from_pandas(self.df)
-        frame.write(
-            self.temp_dir + "into_systemds.csv", format="csv", header=True
-        ).compute(verbose=True)
+        combinations = [  # (n_rows, n_cols, mixed)
+            (3, 2, True),  # Test un-parallelized code (rows <= 4)
+            (10, 5, True),  # Test parallelized column-wise code
+            (5, 10, True),  # Test parallelized column-wise mixed code
+            (5, 10, False),  # Test parallelized row-wise code
+        ]
 
-        # Read the CSV file using pandas
-        result_df = pd.read_csv(self.temp_dir + "into_systemds.csv")
+        for n_rows, n_cols, mixed in combinations:
+            df = create_dataframe(n_rows, n_cols, mixed)
 
-        # Verify the data
-        self.assertTrue(isinstance(result_df, pd.DataFrame))
-        self.assertTrue(self.df.equals(result_df))
+            # Transfer into SystemDS and write to CSV
+            frame = self.sds.from_pandas(df)
+            frame.write(
+                self.temp_dir + "into_systemds.csv", format="csv", header=True
+            ).compute(verbose=True)
+
+            # Read the CSV file using pandas
+            result_df = pd.read_csv(self.temp_dir + "into_systemds.csv")
+
+            # Verify the data
+            self.assertTrue(isinstance(result_df, pd.DataFrame))
+            self.assertTrue(df.equals(result_df))
 
     def test_out_of_systemds(self):
+        n_rows = 3
+        n_cols = 2
+        df = create_dataframe(n_rows, n_cols)
+
         # Create a CSV file to read into SystemDS
-        self.df.to_csv(self.temp_dir + "out_of_systemds.csv", header=False, 
index=False)
+        df.to_csv(self.temp_dir + "out_of_systemds.csv", header=False, 
index=False)
 
         # Read the CSV file into SystemDS and then compute back to pandas
         frame = self.sds.read(
@@ -79,7 +97,7 @@ class TestPandasFromToSystemds(unittest.TestCase):
         result_df["C2"] = result_df["C2"].astype(int)
 
         self.assertTrue(isinstance(result_df, pd.DataFrame))
-        self.assertTrue(self.df.equals(result_df))
+        self.assertTrue(df.equals(result_df))
 
 
 if __name__ == "__main__":
diff --git 
a/src/test/java/org/apache/sysds/test/component/frame/FrameGetSetTest.java 
b/src/test/java/org/apache/sysds/test/component/frame/FrameGetSetTest.java
index 63f617711a..b5c57341e0 100644
--- a/src/test/java/org/apache/sysds/test/component/frame/FrameGetSetTest.java
+++ b/src/test/java/org/apache/sysds/test/component/frame/FrameGetSetTest.java
@@ -19,6 +19,10 @@
 
 package org.apache.sysds.test.component.frame;
 
+import static org.junit.Assert.assertEquals;
+
+import org.apache.sysds.runtime.frame.data.columns.Array;
+import org.apache.sysds.runtime.frame.data.columns.ArrayFactory;
 import org.junit.Assert;
 import org.junit.Test;
 import org.apache.sysds.common.Types.ValueType;
@@ -171,4 +175,43 @@ public class FrameGetSetTest extends AutomatedTestBase
                        throw new RuntimeException(ex);
                }
        }
+
+
+       @Test
+       public void testSetRow() {
+               FrameBlock frame = new FrameBlock(schemaMixed, "0", rows);
+
+               frame.setRow(2, new Object[] {"2", 2.0, 2L, true});
+
+               assertEquals(frame.get(2, 0), "2");
+               assertEquals(frame.get(2, 1), 2.0);
+               assertEquals(frame.get(2, 2), 2L);
+               assertEquals(frame.get(2, 3), true);
+       }
+
+       @Test
+       public void testAppendColumnChunk() {
+               FrameBlock frame = new FrameBlock(schemaMixed, rows);
+
+               Array<?> chunk = ArrayFactory.create(new double[] {1.0, 2.0});
+               Array<?> chunk2 = ArrayFactory.create(new double[] {3.0, 4.0});
+               frame.appendColumnChunk(1, chunk);
+               frame.appendColumnChunk(1, chunk2);
+
+               assertEquals(frame.get(0, 1), 1.0);
+               assertEquals(frame.get(1, 1), 2.0);
+               assertEquals(frame.get(2, 1), 3.0);
+               assertEquals(frame.get(3, 1), 4.0);
+       }
+
+       @Test
+       public void testSetColumnChunk() {
+               FrameBlock frame = new FrameBlock(schemaMixed, "0", rows);
+
+               Array<?> chunk = ArrayFactory.create(new double[] {1.0, 2.0});
+               frame.setColumnChunk(1, chunk, 5, rows);
+
+               assertEquals(frame.get(5, 1), 1.0);
+               assertEquals(frame.get(6, 1), 2.0);
+       }
 }
diff --git 
a/src/test/java/org/apache/sysds/test/component/frame/array/Py4jConverterUtilsTest.java
 
b/src/test/java/org/apache/sysds/test/component/frame/array/Py4jConverterUtilsTest.java
index 980165c3ab..466c3337d8 100644
--- 
a/src/test/java/org/apache/sysds/test/component/frame/array/Py4jConverterUtilsTest.java
+++ 
b/src/test/java/org/apache/sysds/test/component/frame/array/Py4jConverterUtilsTest.java
@@ -170,9 +170,7 @@ public class Py4jConverterUtilsTest {
                ByteBuffer buffer = ByteBuffer.allocate(4 + strings[0].length() 
+ 4 + strings[1].length());
                buffer.order(ByteOrder.LITTLE_ENDIAN);
                for(String s : strings) {
-                       buffer.order(ByteOrder.BIG_ENDIAN);
                        buffer.putInt(s.length());
-                       buffer.order(ByteOrder.LITTLE_ENDIAN);
                        buffer.put(s.getBytes(StandardCharsets.UTF_8));
                }
                Array<?> result = Py4jConverterUtils.convert(buffer.array(), 
numElements, Types.ValueType.STRING);
@@ -199,6 +197,32 @@ public class Py4jConverterUtilsTest {
                }
        }
 
+       @Test
+       public void testConvertRow() {
+               int numElements = 4;
+               byte[] data = {1, 2, 3, 4};
+               Object[] row = Py4jConverterUtils.convertRow(data, numElements, 
Types.ValueType.UINT8);
+               assertNotNull(row);
+               assertEquals(4, row.length);
+               assertEquals(1, row[0]);
+               assertEquals(2, row[1]);
+               assertEquals(3, row[2]);
+               assertEquals(4, row[3]);
+       }
+
+       @Test
+       public void testConvertFused() {
+               int numElements = 1;
+               byte[] data = {1, 2, 3, 4};
+               Types.ValueType[] valueTypes = {ValueType.UINT8, 
ValueType.UINT8, ValueType.UINT8, ValueType.UINT8};
+               Array<?>[] arrays = Py4jConverterUtils.convertFused(data, 
numElements, valueTypes);
+               assertNotNull(arrays);
+               assertEquals(4, arrays.length);
+               for(int i = 0; i < 4; i++) {
+                       assertEquals(1 + i, arrays[i].get(0));
+               }
+       }
+
        @Test(expected = Exception.class)
        public void nullData() {
                Py4jConverterUtils.convert(null, 14, ValueType.BOOLEAN);

Reply via email to