xumingming commented on code in PR #5799:
URL: https://github.com/apache/incubator-gluten/pull/5799#discussion_r1606011589
##########
cpp/core/operators/c2r/ColumnarToRow.h:
##########
@@ -27,7 +28,12 @@ class ColumnarToRowConverter {
virtual ~ColumnarToRowConverter() = default;
- virtual void convert(std::shared_ptr<ColumnarBatch> cb = nullptr) = 0;
+ virtual void
+ convert(std::shared_ptr<ColumnarBatch> cb = nullptr, int64_t rowId = 0,
int64_t memoryThreshold = INT64_MAX) = 0;
Review Comment:
Add comments for the new params
##########
cpp/velox/operators/serializer/VeloxColumnarToRowConverter.cc:
##########
@@ -64,14 +79,14 @@ void
VeloxColumnarToRowConverter::convert(std::shared_ptr<ColumnarBatch> cb) {
offsets_.resize(numRows_, 0);
size_t offset = 0;
- for (auto rowIdx = 0; rowIdx < numRows_; ++rowIdx) {
- auto rowSize = fast_->serialize(rowIdx, (char*)(bufferAddress_ + offset));
- lengths_[rowIdx] = rowSize;
- if (rowIdx > 0) {
- offsets_[rowIdx] = offsets_[rowIdx - 1] + lengths_[rowIdx - 1];
+ for (auto i = 0; i < numRows_; ++i) {
+ auto rowSize = fast_->serialize(rowId + i, (char*)(bufferAddress_ +
offset));
+ lengths_[i] = rowSize;
+ if (i > 0) {
+ offsets_[i] = offsets_[i - 1] + lengths_[i - 1];
}
offset += rowSize;
}
}
-} // namespace gluten
+} // namespace gluten
Review Comment:
Leave one blank line at the end of the file.
##########
shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala:
##########
@@ -187,6 +187,9 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def maxBatchSize: Int = conf.getConf(COLUMNAR_MAX_BATCH_SIZE)
+ def boltColumn2RowMemThreshold: Long =
Review Comment:
boltColumn2RowMemThreshold -> 'columnarToRowMemThreshold'? other places all
using `colunarToRow` rather than `column2Row`, better keep consistent.
##########
cpp/velox/operators/serializer/VeloxColumnarToRowConverter.cc:
##########
@@ -16,46 +16,61 @@
*/
#include "VeloxColumnarToRowConverter.h"
+#include <velox/common/base/SuccinctPrinter.h>
+#include <cstdint>
#include "memory/VeloxColumnarBatch.h"
+#include "utils/exception.h"
#include "velox/row/UnsafeRowDeserializers.h"
#include "velox/row/UnsafeRowFast.h"
using namespace facebook;
namespace gluten {
-void VeloxColumnarToRowConverter::refreshStates(facebook::velox::RowVectorPtr
rowVector) {
- numRows_ = rowVector->size();
+int64_t VeloxColumnarToRowConverter::refreshStates(
+ facebook::velox::RowVectorPtr rowVector,
+ int64_t rowId,
+ int64_t memoryThreshold) {
+ auto vectorLength = rowVector->size();
numCols_ = rowVector->childrenSize();
fast_ = std::make_unique<velox::row::UnsafeRowFast>(rowVector);
size_t totalMemorySize = 0;
if (auto fixedRowSize =
velox::row::UnsafeRowFast::fixedRowSize(velox::asRowType(rowVector->type()))) {
- totalMemorySize += fixedRowSize.value() * numRows_;
+ GLUTEN_CHECK(
+ memoryThreshold > fixedRowSize.value(),
+ "Only 1 row exceed Column2RowMemoryThreshold, which is " +
velox::succinctBytes(memoryThreshold));
+ auto rowSize = fixedRowSize.value();
+ numRows_ = std::min<int64_t>(memoryThreshold / rowSize, vectorLength -
rowId);
+ totalMemorySize = rowSize * numRows_;
} else {
- for (auto i = 0; i < numRows_; ++i) {
- totalMemorySize += fast_->rowSize(i);
+ for (auto i = rowId; i < vectorLength; ++i) {
+ auto rowSize = fast_->rowSize(i);
+ if (UNLIKELY(totalMemorySize + rowSize > memoryThreshold)) {
+ GLUTEN_CHECK(
+ i >= 1, "Only 1 row exceed Column2RowMemoryThreshold, which is " +
velox::succinctBytes(memoryThreshold));
Review Comment:
ditto
##########
cpp/velox/operators/serializer/VeloxColumnarToRowConverter.cc:
##########
@@ -16,46 +16,61 @@
*/
#include "VeloxColumnarToRowConverter.h"
+#include <velox/common/base/SuccinctPrinter.h>
+#include <cstdint>
#include "memory/VeloxColumnarBatch.h"
+#include "utils/exception.h"
#include "velox/row/UnsafeRowDeserializers.h"
#include "velox/row/UnsafeRowFast.h"
using namespace facebook;
namespace gluten {
-void VeloxColumnarToRowConverter::refreshStates(facebook::velox::RowVectorPtr
rowVector) {
- numRows_ = rowVector->size();
+int64_t VeloxColumnarToRowConverter::refreshStates(
+ facebook::velox::RowVectorPtr rowVector,
+ int64_t rowId,
+ int64_t memoryThreshold) {
+ auto vectorLength = rowVector->size();
numCols_ = rowVector->childrenSize();
fast_ = std::make_unique<velox::row::UnsafeRowFast>(rowVector);
size_t totalMemorySize = 0;
if (auto fixedRowSize =
velox::row::UnsafeRowFast::fixedRowSize(velox::asRowType(rowVector->type()))) {
- totalMemorySize += fixedRowSize.value() * numRows_;
+ GLUTEN_CHECK(
+ memoryThreshold > fixedRowSize.value(),
+ "Only 1 row exceed Column2RowMemoryThreshold, which is " +
velox::succinctBytes(memoryThreshold));
Review Comment:
"spark.gluten.sql.columnarToRowMemoryThreshold(123bytes) is too small, it
can't hold even one row(456bytes)" might be better.
##########
cpp/velox/operators/serializer/VeloxColumnarToRowConverter.cc:
##########
@@ -16,46 +16,61 @@
*/
#include "VeloxColumnarToRowConverter.h"
+#include <velox/common/base/SuccinctPrinter.h>
+#include <cstdint>
#include "memory/VeloxColumnarBatch.h"
+#include "utils/exception.h"
#include "velox/row/UnsafeRowDeserializers.h"
#include "velox/row/UnsafeRowFast.h"
using namespace facebook;
namespace gluten {
-void VeloxColumnarToRowConverter::refreshStates(facebook::velox::RowVectorPtr
rowVector) {
- numRows_ = rowVector->size();
+int64_t VeloxColumnarToRowConverter::refreshStates(
Review Comment:
Why do we change the return type?
##########
shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala:
##########
@@ -514,6 +517,9 @@ object GlutenConfig {
// Batch size.
val GLUTEN_MAX_BATCH_SIZE_KEY = "spark.gluten.sql.columnar.maxBatchSize"
+ val GLUTEN_COLUMN_TO_ROW_MEM_THRESHOLD_KEY =
+ "spark.gluten.sql.columnToRowMemoryThreshold"
Review Comment:
spark.gluten.sql.columnToRowMemoryThreshold ->
spark.gluten.sql.columnarToRowMemoryThreshold for the same reason above
##########
cpp/velox/operators/serializer/VeloxColumnarToRowConverter.cc:
##########
@@ -16,46 +16,61 @@
*/
#include "VeloxColumnarToRowConverter.h"
+#include <velox/common/base/SuccinctPrinter.h>
+#include <cstdint>
#include "memory/VeloxColumnarBatch.h"
+#include "utils/exception.h"
#include "velox/row/UnsafeRowDeserializers.h"
#include "velox/row/UnsafeRowFast.h"
using namespace facebook;
namespace gluten {
-void VeloxColumnarToRowConverter::refreshStates(facebook::velox::RowVectorPtr
rowVector) {
- numRows_ = rowVector->size();
+int64_t VeloxColumnarToRowConverter::refreshStates(
+ facebook::velox::RowVectorPtr rowVector,
+ int64_t rowId,
+ int64_t memoryThreshold) {
+ auto vectorLength = rowVector->size();
numCols_ = rowVector->childrenSize();
fast_ = std::make_unique<velox::row::UnsafeRowFast>(rowVector);
size_t totalMemorySize = 0;
if (auto fixedRowSize =
velox::row::UnsafeRowFast::fixedRowSize(velox::asRowType(rowVector->type()))) {
- totalMemorySize += fixedRowSize.value() * numRows_;
+ GLUTEN_CHECK(
+ memoryThreshold > fixedRowSize.value(),
+ "Only 1 row exceed Column2RowMemoryThreshold, which is " +
velox::succinctBytes(memoryThreshold));
+ auto rowSize = fixedRowSize.value();
+ numRows_ = std::min<int64_t>(memoryThreshold / rowSize, vectorLength -
rowId);
+ totalMemorySize = rowSize * numRows_;
} else {
- for (auto i = 0; i < numRows_; ++i) {
- totalMemorySize += fast_->rowSize(i);
+ for (auto i = rowId; i < vectorLength; ++i) {
+ auto rowSize = fast_->rowSize(i);
+ if (UNLIKELY(totalMemorySize + rowSize > memoryThreshold)) {
+ GLUTEN_CHECK(
+ i >= 1, "Only 1 row exceed Column2RowMemoryThreshold, which is " +
velox::succinctBytes(memoryThreshold));
+ break;
+ } else {
+ totalMemorySize += rowSize;
+ }
+ numRows_ = i + 1 - rowId;
}
}
if (veloxBuffers_ == nullptr) {
- // First allocate memory
- veloxBuffers_ = velox::AlignedBuffer::allocate<uint8_t>(totalMemorySize,
veloxPool_.get());
- }
-
- if (veloxBuffers_->capacity() < totalMemorySize) {
Review Comment:
Why do we removed the reallocate memory related code?
##########
cpp/velox/operators/serializer/VeloxColumnarToRowConverter.cc:
##########
@@ -16,46 +16,61 @@
*/
#include "VeloxColumnarToRowConverter.h"
+#include <velox/common/base/SuccinctPrinter.h>
+#include <cstdint>
#include "memory/VeloxColumnarBatch.h"
+#include "utils/exception.h"
#include "velox/row/UnsafeRowDeserializers.h"
#include "velox/row/UnsafeRowFast.h"
using namespace facebook;
namespace gluten {
-void VeloxColumnarToRowConverter::refreshStates(facebook::velox::RowVectorPtr
rowVector) {
- numRows_ = rowVector->size();
+int64_t VeloxColumnarToRowConverter::refreshStates(
+ facebook::velox::RowVectorPtr rowVector,
+ int64_t rowId,
+ int64_t memoryThreshold) {
+ auto vectorLength = rowVector->size();
numCols_ = rowVector->childrenSize();
fast_ = std::make_unique<velox::row::UnsafeRowFast>(rowVector);
size_t totalMemorySize = 0;
if (auto fixedRowSize =
velox::row::UnsafeRowFast::fixedRowSize(velox::asRowType(rowVector->type()))) {
- totalMemorySize += fixedRowSize.value() * numRows_;
+ GLUTEN_CHECK(
+ memoryThreshold > fixedRowSize.value(),
+ "Only 1 row exceed Column2RowMemoryThreshold, which is " +
velox::succinctBytes(memoryThreshold));
+ auto rowSize = fixedRowSize.value();
+ numRows_ = std::min<int64_t>(memoryThreshold / rowSize, vectorLength -
rowId);
+ totalMemorySize = rowSize * numRows_;
} else {
- for (auto i = 0; i < numRows_; ++i) {
- totalMemorySize += fast_->rowSize(i);
+ for (auto i = rowId; i < vectorLength; ++i) {
+ auto rowSize = fast_->rowSize(i);
+ if (UNLIKELY(totalMemorySize + rowSize > memoryThreshold)) {
+ GLUTEN_CHECK(
+ i >= 1, "Only 1 row exceed Column2RowMemoryThreshold, which is " +
velox::succinctBytes(memoryThreshold));
+ break;
+ } else {
+ totalMemorySize += rowSize;
+ }
+ numRows_ = i + 1 - rowId;
Review Comment:
nit: We can move this assignment out of the loop
##########
shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala:
##########
@@ -1031,6 +1038,12 @@ object GlutenConfig {
.checkValue(_ > 0, s"$GLUTEN_MAX_BATCH_SIZE_KEY must be positive.")
.createWithDefault(4096)
+ val GLUTEN_COLUMN_TO_ROW_MEM_THRESHOLD =
Review Comment:
GLUTEN_COLUMN_TO_ROW_MEM_THRESHOLD -> GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD
##########
backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala:
##########
@@ -158,21 +158,29 @@ object VeloxColumnarToRowExec {
val rows = batch.numRows()
val beforeConvert = System.currentTimeMillis()
val batchHandle = ColumnarBatches.getNativeHandle(batch)
- val info =
- jniWrapper.nativeColumnarToRowConvert(batchHandle, c2rId)
+ var info =
+ jniWrapper.nativeColumnarToRowConvert(batchHandle, c2rId, 0)
convertTime += (System.currentTimeMillis() - beforeConvert)
new Iterator[InternalRow] {
var rowId = 0
+ var baseLength = 0
val row = new UnsafeRow(cols)
override def hasNext: Boolean = {
rowId < rows
}
override def next: UnsafeRow = {
- val (offset, length) = (info.offsets(rowId), info.lengths(rowId))
+ if (rowId == baseLength + info.lengths.length) {
+ baseLength += info.lengths.length
+ val before = System.currentTimeMillis()
+ info = jniWrapper.nativeColumnarToRowConvert(batchHandle,
c2rId, rowId)
+ convertTime += (System.currentTimeMillis() - beforeConvert)
Review Comment:
beforeConvert should be `before`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]