pitrou commented on a change in pull request #12702:
URL: https://github.com/apache/arrow/pull/12702#discussion_r834065668



##########
File path: cpp/src/arrow/util/tracing_internal.cc
##########
@@ -184,6 +188,41 @@ opentelemetry::trace::Tracer* GetTracer() {
   return tracer.get();
 }
 
+int parseLine(char* line){

Review comment:
       We try to write C++ code not C code, so this function should take a 
`util::string_view` IMHO.

##########
File path: cpp/src/arrow/util/tracing_internal.h
##########
@@ -48,6 +48,12 @@ namespace tracing {
 ARROW_EXPORT
 opentelemetry::trace::Tracer* GetTracer();
 
+ARROW_EXPORT
+size_t GetMemoryUsed();
+
+ARROW_EXPORT
+size_t GetMemoryUsedByProcess();

Review comment:
       These functions should go into `arrow/util/io_util.{h,cc}` instead.

##########
File path: cpp/src/arrow/util/tracing_internal.h
##########
@@ -48,6 +48,12 @@ namespace tracing {
 ARROW_EXPORT
 opentelemetry::trace::Tracer* GetTracer();
 
+ARROW_EXPORT
+size_t GetMemoryUsed();
+
+ARROW_EXPORT
+size_t GetMemoryUsedByProcess();

Review comment:
       Please return `int64_t` instead.

##########
File path: cpp/src/arrow/util/tracing_internal.cc
##########
@@ -184,6 +188,41 @@ opentelemetry::trace::Tracer* GetTracer() {
   return tracer.get();
 }
 
+int parseLine(char* line){
+  // This assumes that a digit will be found and the line ends in " Kb".
+  int i = strlen(line);
+  const char* p = line;
+  while (*p <'0' || *p > '9') p++;
+  line[i-3] = '\0';
+  i = atoi(p);
+  return i;
+}
+
+size_t GetMemoryUsedByProcess() { //Note: this value is in KB!
+  FILE* file = fopen("/proc/self/status", "r");
+  size_t result = -1;
+  char line[128];
+
+  while (fgets(line, 128, file) != NULL){
+    if (strncmp(line, "VmRSS:", 6) == 0){
+      result = parseLine(line);
+      break;
+    }
+  }
+  fclose(file);
+  return result*1000;
+}
+
+size_t GetMemoryUsed() {
+  size_t total_memory_size;
+  size_t used_memory_size;
+  struct sysinfo si;
+  sysinfo(&si);

Review comment:
       This is unfortunately Linux-specific, we'll need a portable 
implementation (is there a third-party library that we can use to make our life 
easier?).

##########
File path: cpp/src/arrow/util/tracing_internal.cc
##########
@@ -184,6 +188,41 @@ opentelemetry::trace::Tracer* GetTracer() {
   return tracer.get();
 }
 
+int parseLine(char* line){
+  // This assumes that a digit will be found and the line ends in " Kb".
+  int i = strlen(line);
+  const char* p = line;
+  while (*p <'0' || *p > '9') p++;
+  line[i-3] = '\0';
+  i = atoi(p);
+  return i;
+}
+
+size_t GetMemoryUsedByProcess() { //Note: this value is in KB!
+  FILE* file = fopen("/proc/self/status", "r");
+  size_t result = -1;
+  char line[128];
+
+  while (fgets(line, 128, file) != NULL){
+    if (strncmp(line, "VmRSS:", 6) == 0){
+      result = parseLine(line);
+      break;
+    }
+  }
+  fclose(file);
+  return result*1000;

Review comment:
       Are you sure it's not 1024, rather?

##########
File path: cpp/src/arrow/util/tracing_internal.cc
##########
@@ -184,6 +188,41 @@ opentelemetry::trace::Tracer* GetTracer() {
   return tracer.get();
 }
 
+int parseLine(char* line){
+  // This assumes that a digit will be found and the line ends in " Kb".
+  int i = strlen(line);
+  const char* p = line;
+  while (*p <'0' || *p > '9') p++;
+  line[i-3] = '\0';
+  i = atoi(p);
+  return i;
+}
+
+size_t GetMemoryUsedByProcess() { //Note: this value is in KB!
+  FILE* file = fopen("/proc/self/status", "r");

Review comment:
       1) You'll need to check for errors here; 2) would it be easier to use 
`std::fstream`?

##########
File path: cpp/src/arrow/compute/exec/aggregate_node.cc
##########
@@ -168,17 +168,26 @@ class ScalarAggregateNode : public ExecNode {
 
   Status DoConsume(const ExecBatch& batch, size_t thread_index) {
     util::tracing::Span span;
-    START_SPAN(span, "Consume",
-               {{"aggregate", ToStringExtra()},
-                {"node.label", label()},
-                {"batch.length", batch.length}});
+    START_SPAN(
+        span, "Consume",
+        {{"aggregate", ToStringExtra()},
+         {"node.label", label()},
+         {"batch.length", batch.length},
+         {"memory_pool_bytes",
+          plan_->exec_context()->memory_pool()->bytes_allocated()},
+         {"memory_used", arrow::internal::tracing::GetMemoryUsed()},
+         {"memory_used_process", 
arrow::internal::tracing::GetMemoryUsedByProcess()}});
     for (size_t i = 0; i < kernels_.size(); ++i) {
       util::tracing::Span span;
-      START_SPAN(span, aggs_[i].function,
-                 {{"function.name", aggs_[i].function},
-                  {"function.options",
-                   aggs_[i].options ? aggs_[i].options->ToString() : 
"<NULLPTR>"},
-                  {"function.kind", std::string(kind_name()) + "::Consume"}});
+      START_SPAN(
+          span, aggs_[i].function,
+          {{"function.name", aggs_[i].function},
+           {"function.options",
+            aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
+           {"function.kind", std::string(kind_name()) + "::Consume"},
+           {"memory_pool_bytes", 
plan_->exec_context()->memory_pool()->bytes_allocated()},
+           {"memory_used", arrow::internal::tracing::GetMemoryUsed()},
+           {"memory_used_process", 
arrow::internal::tracing::GetMemoryUsedByProcess()}});

Review comment:
       We should certainly avoid repeating this everywhere, and instead find a 
way to factor out those common span attributes (perhaps using a dedicated 
macro?).

##########
File path: cpp/src/arrow/compute/exec/aggregate_node.cc
##########
@@ -168,17 +168,26 @@ class ScalarAggregateNode : public ExecNode {
 
   Status DoConsume(const ExecBatch& batch, size_t thread_index) {
     util::tracing::Span span;
-    START_SPAN(span, "Consume",
-               {{"aggregate", ToStringExtra()},
-                {"node.label", label()},
-                {"batch.length", batch.length}});
+    START_SPAN(
+        span, "Consume",
+        {{"aggregate", ToStringExtra()},
+         {"node.label", label()},
+         {"batch.length", batch.length},
+         {"memory_pool_bytes",
+          plan_->exec_context()->memory_pool()->bytes_allocated()},
+         {"memory_used", arrow::internal::tracing::GetMemoryUsed()},
+         {"memory_used_process", 
arrow::internal::tracing::GetMemoryUsedByProcess()}});

Review comment:
       These calls are potentially costly since they will be issueing system 
calls (and perhaps even read a file). I'm not sure this is desirable. @lidavidm 
What do you think?

##########
File path: cpp/src/arrow/util/tracing_internal.cc
##########
@@ -184,6 +188,41 @@ opentelemetry::trace::Tracer* GetTracer() {
   return tracer.get();
 }
 
+int parseLine(char* line){
+  // This assumes that a digit will be found and the line ends in " Kb".
+  int i = strlen(line);
+  const char* p = line;
+  while (*p <'0' || *p > '9') p++;
+  line[i-3] = '\0';
+  i = atoi(p);
+  return i;
+}
+
+size_t GetMemoryUsedByProcess() { //Note: this value is in KB!

Review comment:
       Which one? The one returned by the function?

##########
File path: cpp/src/arrow/util/tracing_internal.cc
##########
@@ -184,6 +188,41 @@ opentelemetry::trace::Tracer* GetTracer() {
   return tracer.get();
 }
 
+int parseLine(char* line){
+  // This assumes that a digit will be found and the line ends in " Kb".
+  int i = strlen(line);
+  const char* p = line;
+  while (*p <'0' || *p > '9') p++;
+  line[i-3] = '\0';
+  i = atoi(p);
+  return i;
+}
+
+size_t GetMemoryUsedByProcess() { //Note: this value is in KB!
+  FILE* file = fopen("/proc/self/status", "r");

Review comment:
       Also, this is probably Linux-specific...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to