aocsa commented on a change in pull request #10802: URL: https://github.com/apache/arrow/pull/10802#discussion_r680652255
########## File path: cpp/src/arrow/compute/kernels/vector_selection.cc ########## @@ -2146,6 +2147,144 @@ class TakeMetaFunction : public MetaFunction { } }; +// ---------------------------------------------------------------------- +// DropNull Implementation + +Result<std::shared_ptr<arrow::Array>> GetNotNullIndices( + const std::shared_ptr<Array>& column, MemoryPool* memory_pool) { + std::shared_ptr<arrow::Array> indices; + arrow::NumericBuilder<arrow::Int32Type> builder(memory_pool); + builder.Reserve(column->length() - column->null_count()); + + std::vector<int32_t> values; + for (int64_t i = 0; i < column->length(); i++) { + if (column->IsValid(i)) { + builder.UnsafeAppend(static_cast<int32_t>(i)); + } + } + RETURN_NOT_OK(builder.Finish(&indices)); + return indices; +} + +Result<std::shared_ptr<arrow::Array>> GetNotNullIndices( + const std::shared_ptr<ChunkedArray>& chunks, MemoryPool* memory_pool) { + std::shared_ptr<arrow::Array> indices; + arrow::NumericBuilder<arrow::Int32Type> builder(memory_pool); + builder.Reserve(chunks->length() - chunks->null_count()); + int64_t relative_index = 0; + for (int64_t chunk_index = 0; chunk_index < chunks->num_chunks(); ++chunk_index) { + auto column_chunk = chunks->chunk(chunk_index); + for (int64_t col_index = 0; col_index < column_chunk->length(); col_index++) { + if (column_chunk->IsValid(col_index)) { + builder.UnsafeAppend(static_cast<int32_t>(relative_index + col_index)); + } + } + relative_index += column_chunk->length(); + } + RETURN_NOT_OK(builder.Finish(&indices)); + return indices; +} + +Result<std::shared_ptr<RecordBatch>> DropNullRecordBatch(const RecordBatch& batch, + ExecContext* ctx) { + int64_t length_count = 0; + std::vector<std::shared_ptr<Array>> columns(batch.num_columns()); + for (int i = 0; i < batch.num_columns(); ++i) { + ARROW_ASSIGN_OR_RAISE(auto indices, + GetNotNullIndices(batch.column(i), ctx->memory_pool())); + ARROW_ASSIGN_OR_RAISE(Datum out, Take(batch.column(i)->data(), Datum(indices), + TakeOptions::NoBoundsCheck(), ctx)); + columns[i] = out.make_array(); + length_count += columns[i]->length(); + } + return RecordBatch::Make(batch.schema(), length_count, std::move(columns)); +} + +Result<std::shared_ptr<Table>> DropNullTable(const Table& table, ExecContext* ctx) { + if (table.num_rows() == 0) { Review comment: Sure makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org