nirandaperera commented on a change in pull request #10802: URL: https://github.com/apache/arrow/pull/10802#discussion_r683617227
########## File path: cpp/src/arrow/compute/kernels/vector_selection.cc ########## @@ -2146,6 +2146,184 @@ class TakeMetaFunction : public MetaFunction { } }; +// ---------------------------------------------------------------------- +// DropNull Implementation + +Status GetDropNullFilter(const Array& values, MemoryPool* memory_pool, + std::shared_ptr<arrow::BooleanArray>* out_array) { + auto bitmap_buffer = values.null_bitmap(); + *out_array = std::make_shared<BooleanArray>(values.length(), bitmap_buffer, nullptr, 0, + values.offset()); + return Status::OK(); +} + +Status CreateEmptyArray(std::shared_ptr<DataType> type, MemoryPool* memory_pool, + std::shared_ptr<Array>* output_array) { + std::unique_ptr<ArrayBuilder> builder; + RETURN_NOT_OK(MakeBuilder(memory_pool, type, &builder)); + RETURN_NOT_OK(builder->Resize(0)); + ARROW_ASSIGN_OR_RAISE(*output_array, builder->Finish()); + return Status::OK(); +} + +Result<std::shared_ptr<Array>> DropNullArray(const std::shared_ptr<Array>& values, + ExecContext* ctx) { + if (values->null_count() == 0) { + return values; + } + if (values->type()->Equals(arrow::null())) { + return std::make_shared<NullArray>(0); + } + std::shared_ptr<BooleanArray> drop_null_filter; + RETURN_NOT_OK(GetDropNullFilter(*values, ctx->memory_pool(), &drop_null_filter)); + + if (drop_null_filter->null_count() == drop_null_filter->length()) { + std::shared_ptr<Array> empty_array; + RETURN_NOT_OK(CreateEmptyArray(values->type(), ctx->memory_pool(), &empty_array)); + return empty_array; + } + auto options = FilterOptions::Defaults(); + ARROW_ASSIGN_OR_RAISE( + Datum result, + CallFunction("array_filter", {Datum(*values), Datum(*drop_null_filter)}, &options, + ctx)); + return result.make_array(); +} + +Result<std::shared_ptr<ChunkedArray>> DropNullChunkedArray(const ChunkedArray& values, + ExecContext* ctx) { + auto num_chunks = values.num_chunks(); + std::vector<std::shared_ptr<Array>> new_chunks(num_chunks); + for (int i = 0; i < num_chunks; i++) { + ARROW_ASSIGN_OR_RAISE(new_chunks[i], DropNullArray(values.chunk(i), ctx)); + } + return std::make_shared<ChunkedArray>(std::move(new_chunks)); +} + +Result<std::shared_ptr<RecordBatch>> DropNullRecordBatch(const RecordBatch& batch, + ExecContext* ctx) { Review comment: Shall we have an early termination here, that checks if none of the columns have nulls, simply return the input? ########## File path: cpp/src/arrow/compute/kernels/vector_selection.cc ########## @@ -2146,6 +2146,184 @@ class TakeMetaFunction : public MetaFunction { } }; +// ---------------------------------------------------------------------- +// DropNull Implementation + +Status GetDropNullFilter(const Array& values, MemoryPool* memory_pool, + std::shared_ptr<arrow::BooleanArray>* out_array) { + auto bitmap_buffer = values.null_bitmap(); + *out_array = std::make_shared<BooleanArray>(values.length(), bitmap_buffer, nullptr, 0, + values.offset()); + return Status::OK(); +} + +Status CreateEmptyArray(std::shared_ptr<DataType> type, MemoryPool* memory_pool, + std::shared_ptr<Array>* output_array) { + std::unique_ptr<ArrayBuilder> builder; + RETURN_NOT_OK(MakeBuilder(memory_pool, type, &builder)); + RETURN_NOT_OK(builder->Resize(0)); + ARROW_ASSIGN_OR_RAISE(*output_array, builder->Finish()); + return Status::OK(); +} + +Result<std::shared_ptr<Array>> DropNullArray(const std::shared_ptr<Array>& values, + ExecContext* ctx) { + if (values->null_count() == 0) { + return values; + } + if (values->type()->Equals(arrow::null())) { + return std::make_shared<NullArray>(0); + } + std::shared_ptr<BooleanArray> drop_null_filter; + RETURN_NOT_OK(GetDropNullFilter(*values, ctx->memory_pool(), &drop_null_filter)); + + if (drop_null_filter->null_count() == drop_null_filter->length()) { + std::shared_ptr<Array> empty_array; + RETURN_NOT_OK(CreateEmptyArray(values->type(), ctx->memory_pool(), &empty_array)); + return empty_array; + } + auto options = FilterOptions::Defaults(); + ARROW_ASSIGN_OR_RAISE( + Datum result, + CallFunction("array_filter", {Datum(*values), Datum(*drop_null_filter)}, &options, + ctx)); + return result.make_array(); +} + +Result<std::shared_ptr<ChunkedArray>> DropNullChunkedArray(const ChunkedArray& values, + ExecContext* ctx) { + auto num_chunks = values.num_chunks(); + std::vector<std::shared_ptr<Array>> new_chunks(num_chunks); + for (int i = 0; i < num_chunks; i++) { + ARROW_ASSIGN_OR_RAISE(new_chunks[i], DropNullArray(values.chunk(i), ctx)); + } + return std::make_shared<ChunkedArray>(std::move(new_chunks)); +} + +Result<std::shared_ptr<RecordBatch>> DropNullRecordBatch(const RecordBatch& batch, + ExecContext* ctx) { + ARROW_ASSIGN_OR_RAISE(auto dst, + AllocateEmptyBitmap(batch.num_rows(), ctx->memory_pool())); + BitUtil::SetBitsTo(dst->mutable_data(), 0, batch.num_rows(), true); + + for (int col_index = 0; col_index < batch.num_columns(); ++col_index) { + const auto& column = batch.column(col_index); + if (column->null_bitmap_data()) { + ::arrow::internal::BitmapAnd(column->null_bitmap_data(), column->offset(), + dst->data(), 0, column->length(), 0, + dst->mutable_data()); + } else { + for (int64_t i = 0; i < column->length(); ++i) { + if (!column->IsValid(i)) { + BitUtil::ClearBit(dst->mutable_data(), i); + } + } + } + } + auto drop_null_filter = + std::make_shared<BooleanArray>(batch.num_rows(), dst, nullptr, 0, 0); + + if (drop_null_filter->false_count() == batch.num_rows()) { + std::vector<std::shared_ptr<Array>> empty_batch(batch.num_columns()); + for (int i = 0; i < batch.num_columns(); i++) { + RETURN_NOT_OK( + CreateEmptyArray(batch.column(i)->type(), ctx->memory_pool(), &empty_batch[i])); + } + return RecordBatch::Make(batch.schema(), 0, empty_batch); + } + ARROW_ASSIGN_OR_RAISE(Datum result, Filter(Datum(batch), Datum(drop_null_filter), + FilterOptions::Defaults(), ctx)); + return result.record_batch(); +} + +Result<std::shared_ptr<Table>> DropNullTable(const Table& table, ExecContext* ctx) { + if (table.num_rows() == 0) { + return Table::Make(table.schema(), table.columns(), 0); + } + const int num_columns = table.num_columns(); + std::vector<ArrayVector> inputs(num_columns); + Review comment: we can have an early termination for tables as well IMO (if all chunkedarrays are non-null, just return input) ########## File path: cpp/src/arrow/compute/kernels/vector_selection.cc ########## @@ -2146,6 +2146,184 @@ class TakeMetaFunction : public MetaFunction { } }; +// ---------------------------------------------------------------------- +// DropNull Implementation + +Status GetDropNullFilter(const Array& values, MemoryPool* memory_pool, + std::shared_ptr<arrow::BooleanArray>* out_array) { + auto bitmap_buffer = values.null_bitmap(); + *out_array = std::make_shared<BooleanArray>(values.length(), bitmap_buffer, nullptr, 0, + values.offset()); + return Status::OK(); +} + +Status CreateEmptyArray(std::shared_ptr<DataType> type, MemoryPool* memory_pool, + std::shared_ptr<Array>* output_array) { + std::unique_ptr<ArrayBuilder> builder; + RETURN_NOT_OK(MakeBuilder(memory_pool, type, &builder)); + RETURN_NOT_OK(builder->Resize(0)); + ARROW_ASSIGN_OR_RAISE(*output_array, builder->Finish()); + return Status::OK(); +} + +Result<std::shared_ptr<Array>> DropNullArray(const std::shared_ptr<Array>& values, + ExecContext* ctx) { + if (values->null_count() == 0) { + return values; + } + if (values->type()->Equals(arrow::null())) { + return std::make_shared<NullArray>(0); + } + std::shared_ptr<BooleanArray> drop_null_filter; + RETURN_NOT_OK(GetDropNullFilter(*values, ctx->memory_pool(), &drop_null_filter)); + + if (drop_null_filter->null_count() == drop_null_filter->length()) { + std::shared_ptr<Array> empty_array; + RETURN_NOT_OK(CreateEmptyArray(values->type(), ctx->memory_pool(), &empty_array)); + return empty_array; + } + auto options = FilterOptions::Defaults(); + ARROW_ASSIGN_OR_RAISE( + Datum result, + CallFunction("array_filter", {Datum(*values), Datum(*drop_null_filter)}, &options, + ctx)); + return result.make_array(); +} + +Result<std::shared_ptr<ChunkedArray>> DropNullChunkedArray(const ChunkedArray& values, + ExecContext* ctx) { + auto num_chunks = values.num_chunks(); + std::vector<std::shared_ptr<Array>> new_chunks(num_chunks); + for (int i = 0; i < num_chunks; i++) { + ARROW_ASSIGN_OR_RAISE(new_chunks[i], DropNullArray(values.chunk(i), ctx)); + } + return std::make_shared<ChunkedArray>(std::move(new_chunks)); +} + +Result<std::shared_ptr<RecordBatch>> DropNullRecordBatch(const RecordBatch& batch, + ExecContext* ctx) { + ARROW_ASSIGN_OR_RAISE(auto dst, + AllocateEmptyBitmap(batch.num_rows(), ctx->memory_pool())); + BitUtil::SetBitsTo(dst->mutable_data(), 0, batch.num_rows(), true); + + for (int col_index = 0; col_index < batch.num_columns(); ++col_index) { + const auto& column = batch.column(col_index); + if (column->null_bitmap_data()) { + ::arrow::internal::BitmapAnd(column->null_bitmap_data(), column->offset(), + dst->data(), 0, column->length(), 0, + dst->mutable_data()); + } else { + for (int64_t i = 0; i < column->length(); ++i) { + if (!column->IsValid(i)) { + BitUtil::ClearBit(dst->mutable_data(), i); + } + } + } + } + auto drop_null_filter = + std::make_shared<BooleanArray>(batch.num_rows(), dst, nullptr, 0, 0); + + if (drop_null_filter->false_count() == batch.num_rows()) { + std::vector<std::shared_ptr<Array>> empty_batch(batch.num_columns()); + for (int i = 0; i < batch.num_columns(); i++) { + RETURN_NOT_OK( + CreateEmptyArray(batch.column(i)->type(), ctx->memory_pool(), &empty_batch[i])); + } + return RecordBatch::Make(batch.schema(), 0, empty_batch); + } + ARROW_ASSIGN_OR_RAISE(Datum result, Filter(Datum(batch), Datum(drop_null_filter), + FilterOptions::Defaults(), ctx)); + return result.record_batch(); +} + +Result<std::shared_ptr<Table>> DropNullTable(const Table& table, ExecContext* ctx) { + if (table.num_rows() == 0) { + return Table::Make(table.schema(), table.columns(), 0); + } + const int num_columns = table.num_columns(); + std::vector<ArrayVector> inputs(num_columns); + + // Fetch table columns + for (int i = 0; i < num_columns; ++i) { + inputs[i] = table.column(i)->chunks(); + } + + ARROW_ASSIGN_OR_RAISE(auto dst, + AllocateEmptyBitmap(table.num_rows(), ctx->memory_pool())); + BitUtil::SetBitsTo(dst->mutable_data(), 0, table.num_rows(), true); + // Note: Not all chunks has null_bitmap data, so we are using IsValid method + for (int col = 0; col < num_columns; ++col) { + int64_t relative_index = 0; + for (int64_t chunk_index = 0; chunk_index < static_cast<int64_t>(inputs[col].size()); + ++chunk_index) { + const auto& column_chunk = inputs[col][chunk_index]; + for (int64_t i = 0; i < column_chunk->length(); ++i) { + if (!column_chunk->IsValid(i)) { + BitUtil::ClearBit(dst->mutable_data(), relative_index + i); Review comment: I think you can use the `BitmapAnd` approach you used in record batches here. Only thing is you'll have to manipulate the offsets and lengths properly :slightly_smiling_face: ########## File path: cpp/src/arrow/compute/kernels/vector_selection.cc ########## @@ -2146,6 +2146,184 @@ class TakeMetaFunction : public MetaFunction { } }; +// ---------------------------------------------------------------------- +// DropNull Implementation + +Status GetDropNullFilter(const Array& values, MemoryPool* memory_pool, + std::shared_ptr<arrow::BooleanArray>* out_array) { + auto bitmap_buffer = values.null_bitmap(); + *out_array = std::make_shared<BooleanArray>(values.length(), bitmap_buffer, nullptr, 0, + values.offset()); + return Status::OK(); +} + +Status CreateEmptyArray(std::shared_ptr<DataType> type, MemoryPool* memory_pool, + std::shared_ptr<Array>* output_array) { + std::unique_ptr<ArrayBuilder> builder; + RETURN_NOT_OK(MakeBuilder(memory_pool, type, &builder)); + RETURN_NOT_OK(builder->Resize(0)); + ARROW_ASSIGN_OR_RAISE(*output_array, builder->Finish()); + return Status::OK(); +} + +Result<std::shared_ptr<Array>> DropNullArray(const std::shared_ptr<Array>& values, + ExecContext* ctx) { + if (values->null_count() == 0) { + return values; + } + if (values->type()->Equals(arrow::null())) { + return std::make_shared<NullArray>(0); + } + std::shared_ptr<BooleanArray> drop_null_filter; + RETURN_NOT_OK(GetDropNullFilter(*values, ctx->memory_pool(), &drop_null_filter)); + + if (drop_null_filter->null_count() == drop_null_filter->length()) { + std::shared_ptr<Array> empty_array; + RETURN_NOT_OK(CreateEmptyArray(values->type(), ctx->memory_pool(), &empty_array)); + return empty_array; + } + auto options = FilterOptions::Defaults(); + ARROW_ASSIGN_OR_RAISE( + Datum result, + CallFunction("array_filter", {Datum(*values), Datum(*drop_null_filter)}, &options, + ctx)); + return result.make_array(); +} + +Result<std::shared_ptr<ChunkedArray>> DropNullChunkedArray(const ChunkedArray& values, + ExecContext* ctx) { + auto num_chunks = values.num_chunks(); + std::vector<std::shared_ptr<Array>> new_chunks(num_chunks); + for (int i = 0; i < num_chunks; i++) { + ARROW_ASSIGN_OR_RAISE(new_chunks[i], DropNullArray(values.chunk(i), ctx)); + } + return std::make_shared<ChunkedArray>(std::move(new_chunks)); +} + +Result<std::shared_ptr<RecordBatch>> DropNullRecordBatch(const RecordBatch& batch, + ExecContext* ctx) { + ARROW_ASSIGN_OR_RAISE(auto dst, + AllocateEmptyBitmap(batch.num_rows(), ctx->memory_pool())); + BitUtil::SetBitsTo(dst->mutable_data(), 0, batch.num_rows(), true); + + for (int col_index = 0; col_index < batch.num_columns(); ++col_index) { + const auto& column = batch.column(col_index); + if (column->null_bitmap_data()) { + ::arrow::internal::BitmapAnd(column->null_bitmap_data(), column->offset(), + dst->data(), 0, column->length(), 0, + dst->mutable_data()); + } else { + for (int64_t i = 0; i < column->length(); ++i) { + if (!column->IsValid(i)) { + BitUtil::ClearBit(dst->mutable_data(), i); + } Review comment: I think this else block is redundant. `column->null_bitmap_data() == nullptr` means that all are valid. So we can omit this branch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org