martin-g commented on code in PR #2521:
URL: https://github.com/apache/datafusion-comet/pull/2521#discussion_r2497967559


##########
docs/source/contributor-guide/debugging.md:
##########
@@ -189,3 +189,43 @@ This produces output like the following:
 
 Additionally, you can place a `log4rs.yaml` configuration file inside the 
Comet configuration directory specified by the `COMET_CONF_DIR` environment 
variable to enable more advanced logging configurations. This file uses the 
[log4rs YAML configuration 
format](https://docs.rs/log4rs/latest/log4rs/#configuration-via-a-yaml-file).
 For example, see: 
[log4rs.yaml](https://github.com/apache/datafusion-comet/blob/main/conf/log4rs.yaml).
+
+### Debugging Memory Reservations
+
+Set `spark.comet.debug.memory=true` to log all calls that grow or shrink 
memory reservations.
+
+Example log output:
+
+```
+[Task 486] MemoryPool[ExternalSorter[6]].try_grow(256232960) returning Ok
+[Task 486] MemoryPool[ExternalSorter[6]].try_grow(256375168) returning Ok
+[Task 486] MemoryPool[ExternalSorter[6]].try_grow(256899456) returning Ok
+[Task 486] MemoryPool[ExternalSorter[6]].try_grow(257296128) returning Ok
+[Task 486] MemoryPool[ExternalSorter[6]].try_grow(257820416) returning Err
+[Task 486] MemoryPool[ExternalSorterMerge[6]].shrink(10485760)
+[Task 486] MemoryPool[ExternalSorter[6]].shrink(150464)
+[Task 486] MemoryPool[ExternalSorter[6]].shrink(146688)
+[Task 486] MemoryPool[ExternalSorter[6]].shrink(137856)
+[Task 486] MemoryPool[ExternalSorter[6]].shrink(141952)
+[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(0) returning Ok
+[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(0) returning Ok
+[Task 486] MemoryPool[ExternalSorter[6]].shrink(524288)
+[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(0) returning Ok
+[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(68928) returning Ok
+```
+
+When backtraces are enabled (see earlier section) then backtraces will be 
included for failed allocations. 
+
+There are Python scripts in `dev/scripts` that can be used to produce charts 
for a particular Spark task.
+
+First, extract the memory logging and write to CSV:
+
+```shell
+python3 dev/scripts/mem_debug_to_csv.py /path/to/executor/log > /tmp/mem.csv
+```
+
+Next, generate a chart from the CSV file for a specific Spark task:
+
+```shell
+python3 dev/scripts/plot_memory_usage.py /tmp/mem.csv --task 1234

Review Comment:
   ```suggestion
   python3 dev/scripts/plot_memory_usage.py /tmp/mem.csv
   ```
   plot_memory_usage.py does not accept `--task` argument



##########
dev/scripts/mem_debug_to_csv.py:
##########
@@ -0,0 +1,73 @@
+#!/usr/bin/python
+##############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+##############################################################################
+
+import argparse
+import re
+import sys
+
+def main(file, task_filter):
+    # keep track of running total allocation per consumer
+    alloc = {}
+
+    # open file
+    with open(file) as f:
+        # iterate over lines in file
+        print("name,size,label")
+        for line in f:
+            # print(line, file=sys.stderr)
+
+            # example line: [Task 486] 
MemoryPool[HashJoinInput[6]].shrink(1000)
+            # parse consumer name
+            re_match = re.search('\[Task (.*)\] 
MemoryPool\[(.*)\]\.(try_grow|grow|shrink)\(([0-9]*)\)', line, re.IGNORECASE)
+            if re_match:
+                try:
+                    task = int(re_match.group(1))
+                    if task != task_filter:
+                        continue
+
+                    consumer = re_match.group(2)
+                    method = re_match.group(3)
+                    size = int(re_match.group(4))
+
+                    if alloc.get(consumer) is None:
+                        alloc[consumer] = size
+                    else:
+                        if method == "grow" or method == "try_grow":
+                            if "Err" in line:
+                                # do not update allocation if try_grow failed
+                                # annotate this entry so it can be shown in 
the chart
+                                print(consumer, ",", alloc[consumer], ",ERR")
+                            else:
+                                alloc[consumer] = alloc[consumer] + size
+                        elif method == "shrink":
+                            alloc[consumer] = alloc[consumer] - size
+
+                    print(consumer, ",", alloc[consumer])
+
+                except Exception as e:
+                    print("error parsing", line, e, file=sys.stderr)
+
+
+if __name__ == "__main__":
+    ap = argparse.ArgumentParser(description="Generate CSV From memory debug 
output")
+    ap.add_argument("--task", default=None, help="Task ID.")
+    ap.add_argument("--file", default=None, help="Spark log containing memory 
debug output")

Review Comment:
   The `file` argument seems to be mandatory, not optional.
   It is used at 
https://github.com/apache/datafusion-comet/pull/2521/files#diff-b1b45e935652f7568175f6d7b83ff247fab24d507c782ef1e53392a53410e095R30



##########
common/src/main/scala/org/apache/comet/CometConf.scala:
##########
@@ -54,6 +54,9 @@ object CometConf extends ShimCometConf {
   private val TRACING_GUIDE = "For more information, refer to the Comet 
Tracing " +
     "Guide (https://datafusion.apache.org/comet/user-guide/tracing.html)"
 
+  private val DEBUGGING_GUIDE = "For more information, refer to the Comet 
Debugging " +
+    "Guide 
(https://datafusion.apache.org/comet/contributor-guide/debugging.html";

Review Comment:
   ```suggestion
       "Guide 
(https://datafusion.apache.org/comet/contributor-guide/debugging.html)"
   ```



##########
docs/source/contributor-guide/debugging.md:
##########
@@ -189,3 +189,43 @@ This produces output like the following:
 
 Additionally, you can place a `log4rs.yaml` configuration file inside the 
Comet configuration directory specified by the `COMET_CONF_DIR` environment 
variable to enable more advanced logging configurations. This file uses the 
[log4rs YAML configuration 
format](https://docs.rs/log4rs/latest/log4rs/#configuration-via-a-yaml-file).
 For example, see: 
[log4rs.yaml](https://github.com/apache/datafusion-comet/blob/main/conf/log4rs.yaml).
+
+### Debugging Memory Reservations
+
+Set `spark.comet.debug.memory=true` to log all calls that grow or shrink 
memory reservations.
+
+Example log output:
+
+```
+[Task 486] MemoryPool[ExternalSorter[6]].try_grow(256232960) returning Ok
+[Task 486] MemoryPool[ExternalSorter[6]].try_grow(256375168) returning Ok
+[Task 486] MemoryPool[ExternalSorter[6]].try_grow(256899456) returning Ok
+[Task 486] MemoryPool[ExternalSorter[6]].try_grow(257296128) returning Ok
+[Task 486] MemoryPool[ExternalSorter[6]].try_grow(257820416) returning Err
+[Task 486] MemoryPool[ExternalSorterMerge[6]].shrink(10485760)
+[Task 486] MemoryPool[ExternalSorter[6]].shrink(150464)
+[Task 486] MemoryPool[ExternalSorter[6]].shrink(146688)
+[Task 486] MemoryPool[ExternalSorter[6]].shrink(137856)
+[Task 486] MemoryPool[ExternalSorter[6]].shrink(141952)
+[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(0) returning Ok
+[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(0) returning Ok
+[Task 486] MemoryPool[ExternalSorter[6]].shrink(524288)
+[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(0) returning Ok
+[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(68928) returning Ok
+```
+
+When backtraces are enabled (see earlier section) then backtraces will be 
included for failed allocations. 
+
+There are Python scripts in `dev/scripts` that can be used to produce charts 
for a particular Spark task.
+
+First, extract the memory logging and write to CSV:
+
+```shell
+python3 dev/scripts/mem_debug_to_csv.py /path/to/executor/log > /tmp/mem.csv

Review Comment:
   Currently `mem_debug_to_csv.py` uses `--file ...` but I think the file 
should be a positional argument as above.



##########
dev/scripts/mem_debug_to_csv.py:
##########
@@ -0,0 +1,73 @@
+#!/usr/bin/python
+##############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+##############################################################################
+
+import argparse
+import re
+import sys
+
+def main(file, task_filter):
+    # keep track of running total allocation per consumer
+    alloc = {}
+
+    # open file
+    with open(file) as f:
+        # iterate over lines in file
+        print("name,size,label")
+        for line in f:
+            # print(line, file=sys.stderr)
+
+            # example line: [Task 486] 
MemoryPool[HashJoinInput[6]].shrink(1000)
+            # parse consumer name
+            re_match = re.search('\[Task (.*)\] 
MemoryPool\[(.*)\]\.(try_grow|grow|shrink)\(([0-9]*)\)', line, re.IGNORECASE)
+            if re_match:
+                try:
+                    task = int(re_match.group(1))
+                    if task != task_filter:
+                        continue
+
+                    consumer = re_match.group(2)
+                    method = re_match.group(3)
+                    size = int(re_match.group(4))
+
+                    if alloc.get(consumer) is None:
+                        alloc[consumer] = size
+                    else:
+                        if method == "grow" or method == "try_grow":
+                            if "Err" in line:
+                                # do not update allocation if try_grow failed
+                                # annotate this entry so it can be shown in 
the chart
+                                print(consumer, ",", alloc[consumer], ",ERR")

Review Comment:
   This will print two rows for the same consumer - one with `ERR` label and 
another one without a label at line 62. I think here you should just assign 
`,ERR` to a new `label` that should be used at line 62



##########
native/core/src/execution/jni_api.rs:
##########
@@ -187,6 +188,7 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_Native_createPlan(
         let tracing_enabled = spark_config.get_bool(COMET_TRACING_ENABLED);
         let max_temp_directory_size =
             spark_config.get_u64(COMET_MAX_TEMP_DIRECTORY_SIZE, 100 * 1024 * 
1024 * 1024);
+        let logging_memory_pool = spark_config.get_bool(COMET_DEBUG_MEMORY);

Review Comment:
   ```suggestion
           let debug_memory_enabled = spark_config.get_bool(COMET_DEBUG_MEMORY);
   ```



##########
dev/scripts/mem_debug_to_csv.py:
##########
@@ -0,0 +1,73 @@
+#!/usr/bin/python

Review Comment:
   ```suggestion
   #!/usr/bin/env python3
   ```



##########
dev/scripts/mem_debug_to_csv.py:
##########


Review Comment:
   Make the file executable - `chmod +x`



##########
dev/scripts/mem_debug_to_csv.py:
##########
@@ -0,0 +1,73 @@
+#!/usr/bin/python
+##############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+##############################################################################
+
+import argparse
+import re
+import sys
+
+def main(file, task_filter):
+    # keep track of running total allocation per consumer
+    alloc = {}
+
+    # open file
+    with open(file) as f:
+        # iterate over lines in file
+        print("name,size,label")
+        for line in f:
+            # print(line, file=sys.stderr)
+
+            # example line: [Task 486] 
MemoryPool[HashJoinInput[6]].shrink(1000)
+            # parse consumer name
+            re_match = re.search('\[Task (.*)\] 
MemoryPool\[(.*)\]\.(try_grow|grow|shrink)\(([0-9]*)\)', line, re.IGNORECASE)
+            if re_match:
+                try:
+                    task = int(re_match.group(1))
+                    if task != task_filter:
+                        continue
+
+                    consumer = re_match.group(2)
+                    method = re_match.group(3)
+                    size = int(re_match.group(4))
+
+                    if alloc.get(consumer) is None:
+                        alloc[consumer] = size
+                    else:
+                        if method == "grow" or method == "try_grow":
+                            if "Err" in line:
+                                # do not update allocation if try_grow failed
+                                # annotate this entry so it can be shown in 
the chart
+                                print(consumer, ",", alloc[consumer], ",ERR")
+                            else:
+                                alloc[consumer] = alloc[consumer] + size
+                        elif method == "shrink":
+                            alloc[consumer] = alloc[consumer] - size
+
+                    print(consumer, ",", alloc[consumer])
+
+                except Exception as e:
+                    print("error parsing", line, e, file=sys.stderr)
+
+
+if __name__ == "__main__":
+    ap = argparse.ArgumentParser(description="Generate CSV From memory debug 
output")
+    ap.add_argument("--task", default=None, help="Task ID.")
+    ap.add_argument("--file", default=None, help="Spark log containing memory 
debug output")
+    args = ap.parse_args()
+    main(args.file, int(args.task))

Review Comment:
   The task is optional parameter. Calling `int(None)` will fail with TypeError



##########
docs/source/user-guide/latest/configs.md:
##########
@@ -35,6 +35,7 @@ Comet provides the following configuration settings.
 | spark.comet.convert.json.enabled | When enabled, data from Spark 
(non-native) JSON v1 and v2 scans will be converted to Arrow format. Note that 
to enable native vectorized execution, both this config and 
'spark.comet.exec.enabled' need to be enabled. | false |
 | spark.comet.convert.parquet.enabled | When enabled, data from Spark 
(non-native) Parquet v1 and v2 scans will be converted to Arrow format. Note 
that to enable native vectorized execution, both this config and 
'spark.comet.exec.enabled' need to be enabled. | false |
 | spark.comet.debug.enabled | Whether to enable debug mode for Comet. When 
enabled, Comet will do additional checks for debugging purpose. For example, 
validating array when importing arrays from JVM at native side. Note that these 
checks may be expensive in performance and should only be enabled for debugging 
purpose. | false |
+| spark.comet.debug.memory | When enabled, log all native memory pool 
interactions. For more information, refer to the Comet Debugging Guide 
(https://datafusion.apache.org/comet/contributor-guide/debugging.html. | false |

Review Comment:
   ```suggestion
   | spark.comet.debug.memory | When enabled, log all native memory pool 
interactions. For more information, refer to the Comet Debugging Guide 
(https://datafusion.apache.org/comet/contributor-guide/debugging.html). | false 
|
   ```



##########
dev/scripts/plot_memory_usage.py:
##########
@@ -0,0 +1,69 @@
+#!/usr/bin/python

Review Comment:
   ```suggestion
   #!/usr/bin/env python3
   ```



##########
dev/scripts/mem_debug_to_csv.py:
##########
@@ -0,0 +1,73 @@
+#!/usr/bin/python
+##############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+##############################################################################
+
+import argparse
+import re
+import sys
+
+def main(file, task_filter):
+    # keep track of running total allocation per consumer
+    alloc = {}
+
+    # open file
+    with open(file) as f:
+        # iterate over lines in file
+        print("name,size,label")
+        for line in f:
+            # print(line, file=sys.stderr)
+
+            # example line: [Task 486] 
MemoryPool[HashJoinInput[6]].shrink(1000)
+            # parse consumer name
+            re_match = re.search('\[Task (.*)\] 
MemoryPool\[(.*)\]\.(try_grow|grow|shrink)\(([0-9]*)\)', line, re.IGNORECASE)
+            if re_match:
+                try:
+                    task = int(re_match.group(1))
+                    if task != task_filter:
+                        continue
+
+                    consumer = re_match.group(2)
+                    method = re_match.group(3)
+                    size = int(re_match.group(4))
+
+                    if alloc.get(consumer) is None:
+                        alloc[consumer] = size
+                    else:
+                        if method == "grow" or method == "try_grow":
+                            if "Err" in line:
+                                # do not update allocation if try_grow failed
+                                # annotate this entry so it can be shown in 
the chart
+                                print(consumer, ",", alloc[consumer], ",ERR")
+                            else:
+                                alloc[consumer] = alloc[consumer] + size
+                        elif method == "shrink":
+                            alloc[consumer] = alloc[consumer] - size
+
+                    print(consumer, ",", alloc[consumer])

Review Comment:
   ```suggestion
                       print(f"{consumer},{alloc[consumer]}")
   ```
   nit: to avoid the extra spaces around each item



##########
dev/scripts/plot_memory_usage.py:
##########


Review Comment:
   Make the file executable - `chmod +x`



##########
dev/scripts/mem_debug_to_csv.py:
##########
@@ -0,0 +1,73 @@
+#!/usr/bin/python
+##############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+##############################################################################
+
+import argparse
+import re
+import sys
+
+def main(file, task_filter):
+    # keep track of running total allocation per consumer
+    alloc = {}
+
+    # open file
+    with open(file) as f:
+        # iterate over lines in file
+        print("name,size,label")
+        for line in f:
+            # print(line, file=sys.stderr)
+
+            # example line: [Task 486] 
MemoryPool[HashJoinInput[6]].shrink(1000)
+            # parse consumer name
+            re_match = re.search('\[Task (.*)\] 
MemoryPool\[(.*)\]\.(try_grow|grow|shrink)\(([0-9]*)\)', line, re.IGNORECASE)
+            if re_match:
+                try:
+                    task = int(re_match.group(1))
+                    if task != task_filter:
+                        continue
+
+                    consumer = re_match.group(2)
+                    method = re_match.group(3)
+                    size = int(re_match.group(4))
+
+                    if alloc.get(consumer) is None:
+                        alloc[consumer] = size

Review Comment:
   Would it be possible the `method` to be `shrink` for the first occurrence ?



##########
dev/scripts/plot_memory_usage.py:
##########
@@ -0,0 +1,69 @@
+#!/usr/bin/python
+##############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+##############################################################################
+
+import pandas as pd
+import matplotlib.pyplot as plt
+import sys
+
+def plot_memory_usage(csv_file):
+    # Read the CSV file
+    df = pd.read_csv(csv_file)
+
+    # Create time index based on row order (each row is a sequential time 
point)
+    df['time'] = range(len(df))
+
+    # Pivot the data to have consumers as columns
+    pivot_df = df.pivot(index='time', columns='name', values='size')
+    pivot_df = pivot_df.fillna(method='ffill').fillna(0)

Review Comment:
   ```suggestion
       pivot_df = pivot_df.ffill().fillna(0)
   ```
   https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.fillna.html - 
`Deprecated since version 2.1.0: Use ffill or bfill instead.`



##########
native/core/src/execution/jni_api.rs:
##########
@@ -225,6 +227,12 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_Native_createPlan(
             let memory_pool =
                 create_memory_pool(&memory_pool_config, task_memory_manager, 
task_attempt_id);
 
+            let memory_pool = if logging_memory_pool {

Review Comment:
   ```suggestion
               let memory_pool = if debug_memory_enabled {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to