mathyingzhou commented on a change in pull request #8648: URL: https://github.com/apache/arrow/pull/8648#discussion_r594472371
########## File path: cpp/src/arrow/adapters/orc/adapter_util.cc ########## @@ -316,10 +326,525 @@ Status AppendBatch(const liborc::Type* type, liborc::ColumnVectorBatch* batch, } } +template <class array_type, class batch_type> +Status FillNumericBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch, + int64_t& arrowOffset, int64_t& orcOffset, int64_t length, + Array* parray, std::vector<bool>* incomingMask) { + auto array = checked_cast<array_type*>(parray); + auto batch = checked_cast<batch_type*>(cbatch); + int64_t arrowLength = array->length(); + if (!arrowLength) { + return Status::OK(); + } + if (array->null_count() || incomingMask) { + batch->hasNulls = true; + } + for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) { + if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) { + batch->notNull[orcOffset] = false; + } else { + batch->data[orcOffset] = array->Value(arrowOffset); + batch->notNull[orcOffset] = true; + } + } + batch->numElements = orcOffset; + return Status::OK(); +} + +template <class array_type, class batch_type, class target_type> +Status FillNumericBatchCast(const DataType* type, liborc::ColumnVectorBatch* cbatch, + int64_t& arrowOffset, int64_t& orcOffset, int64_t length, + Array* parray, std::vector<bool>* incomingMask) { + auto array = checked_cast<array_type*>(parray); + auto batch = checked_cast<batch_type*>(cbatch); + int64_t arrowLength = array->length(); + if (!arrowLength) { + return Status::OK(); + } + if (array->null_count() || incomingMask) { + batch->hasNulls = true; + } + for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) { + if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) { + batch->notNull[orcOffset] = false; + } else { + batch->data[orcOffset] = static_cast<target_type>(array->Value(arrowOffset)); + batch->notNull[orcOffset] = true; + } + } + batch->numElements = orcOffset; + return Status::OK(); +} + +Status FillDate64Batch(const DataType* type, liborc::ColumnVectorBatch* cbatch, + int64_t& arrowOffset, int64_t& orcOffset, int64_t length, + Array* parray, std::vector<bool>* incomingMask) { + auto array = checked_cast<Date64Array*>(parray); + auto batch = checked_cast<liborc::TimestampVectorBatch*>(cbatch); + int64_t arrowLength = array->length(); + if (!arrowLength) { + return Status::OK(); + } + if (array->null_count() || incomingMask) { + batch->hasNulls = true; + } + for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) { + if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) { + batch->notNull[orcOffset] = false; + } else { + int64_t miliseconds = array->Value(arrowOffset); + batch->data[orcOffset] = + static_cast<int64_t>(std::floor(miliseconds / kOneSecondMillis)); + batch->nanoseconds[orcOffset] = + (miliseconds - kOneSecondMillis * batch->data[orcOffset]) * kOneMilliNanos; + batch->notNull[orcOffset] = true; + } + } + batch->numElements = orcOffset; + return Status::OK(); +} + +Status FillTimestampBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch, + int64_t& arrowOffset, int64_t& orcOffset, int64_t length, + Array* parray, std::vector<bool>* incomingMask) { + auto array = checked_cast<TimestampArray*>(parray); + auto batch = checked_cast<liborc::TimestampVectorBatch*>(cbatch); + int64_t arrowLength = array->length(); + if (!arrowLength) { + return Status::OK(); + } + if (array->null_count() || incomingMask) { + batch->hasNulls = true; + } + for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) { + if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) { + batch->notNull[orcOffset] = false; + } else { + int64_t data = array->Value(arrowOffset); + batch->notNull[orcOffset] = true; + switch (std::static_pointer_cast<TimestampType>(array->type())->unit()) { + case TimeUnit::type::SECOND: { + batch->data[orcOffset] = data; + batch->nanoseconds[orcOffset] = 0; + break; + } + case TimeUnit::type::MILLI: { + batch->data[orcOffset] = + static_cast<int64_t>(std::floor(data / kOneSecondMillis)); + batch->nanoseconds[orcOffset] = + (data - kOneSecondMillis * batch->data[orcOffset]) * kOneMilliNanos; + break; + } + case TimeUnit::type::MICRO: { + batch->data[orcOffset] = + static_cast<int64_t>(std::floor(data / kOneSecondMicros)); + batch->nanoseconds[orcOffset] = + (data - kOneSecondMicros * batch->data[orcOffset]) * kOneMicroNanos; + break; + } + default: { + batch->data[orcOffset] = + static_cast<int64_t>(std::floor(data / kOneSecondNanos)); + batch->nanoseconds[orcOffset] = data - kOneSecondNanos * batch->data[orcOffset]; + } + } + } + } + batch->numElements = orcOffset; + return Status::OK(); +} + +template <class array_type> +Status FillStringBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch, + int64_t& arrowOffset, int64_t& orcOffset, int64_t length, + Array* parray, std::vector<bool>* incomingMask) { + auto array = checked_cast<array_type*>(parray); + auto batch = checked_cast<liborc::StringVectorBatch*>(cbatch); + int64_t arrowLength = array->length(); + if (!arrowLength) { + return Status::OK(); + } + if (array->null_count() || incomingMask) { + batch->hasNulls = true; + } + for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) { + if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) { + batch->notNull[orcOffset] = false; + } else { + batch->notNull[orcOffset] = true; + std::string dataString = array->GetString(arrowOffset); + int dataStringLength = dataString.length(); + if (batch->data[orcOffset]) delete batch->data[orcOffset]; + batch->data[orcOffset] = new char[dataStringLength + 1]; // Include null + memcpy(batch->data[orcOffset], dataString.c_str(), dataStringLength + 1); + batch->length[orcOffset] = dataStringLength; + } + } + batch->numElements = orcOffset; + return Status::OK(); +} + +template <class array_type, class offset_type> +Status FillBinaryBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch, + int64_t& arrowOffset, int64_t& orcOffset, int64_t length, + Array* parray, std::vector<bool>* incomingMask) { + auto array = checked_cast<array_type*>(parray); + auto batch = checked_cast<liborc::StringVectorBatch*>(cbatch); + int64_t arrowLength = array->length(); + if (!arrowLength) { + return Status::OK(); + } + if (array->null_count() || incomingMask) { + batch->hasNulls = true; + } + for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) { + if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) { + batch->notNull[orcOffset] = false; + } else { + batch->notNull[orcOffset] = true; + offset_type dataLength = 0; + const uint8_t* data = array->GetValue(arrowOffset, &dataLength); + if (batch->data[orcOffset]) delete batch->data[orcOffset]; + batch->data[orcOffset] = new char[dataLength]; // Do not include null + memcpy(batch->data[orcOffset], data, dataLength); + batch->length[orcOffset] = dataLength; + } + } + batch->numElements = orcOffset; + return Status::OK(); +} + +Status FillFixedSizeBinaryBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch, + int64_t& arrowOffset, int64_t& orcOffset, int64_t length, + Array* parray, std::vector<bool>* incomingMask) { + auto array = checked_cast<FixedSizeBinaryArray*>(parray); + auto batch = checked_cast<liborc::StringVectorBatch*>(cbatch); + int64_t arrowLength = array->length(); + if (!arrowLength) { + return Status::OK(); + } + int32_t byteWidth = array->byte_width(); + if (array->null_count() || incomingMask) { + batch->hasNulls = true; + } + for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) { + if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) { + batch->notNull[orcOffset] = false; + } else { + batch->notNull[orcOffset] = true; + const uint8_t* data = array->GetValue(arrowOffset); + if (batch->data[orcOffset]) delete batch->data[orcOffset]; + batch->data[orcOffset] = new char[byteWidth]; // Do not include null + memcpy(batch->data[orcOffset], data, byteWidth); + batch->length[orcOffset] = byteWidth; + } + } + batch->numElements = orcOffset; + return Status::OK(); +} + +// If Arrow supports 256-bit decimals we can not support it unless ORC does it +Status FillDecimalBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch, + int64_t& arrowOffset, int64_t& orcOffset, int64_t length, + Array* parray, std::vector<bool>* incomingMask) { + auto array = checked_cast<Decimal128Array*>(parray); + auto batch = checked_cast<liborc::Decimal128VectorBatch*>(cbatch); + // Arrow uses 128 bits for decimal type and in the future, 256 bits will also be + // supported. + int64_t arrowLength = array->length(); + if (!arrowLength) { + return Status::OK(); + } + if (array->null_count() || incomingMask) { + batch->hasNulls = true; + } + for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) { + if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) { + batch->notNull[orcOffset] = false; + } else { + batch->notNull[orcOffset] = true; + uint8_t* rawInt128 = const_cast<uint8_t*>(array->GetValue(arrowOffset)); + uint64_t* lowerBits = reinterpret_cast<uint64_t*>(rawInt128); + int64_t* higherBits = reinterpret_cast<int64_t*>(rawInt128 + 8); + batch->values[orcOffset] = liborc::Int128(*higherBits, *lowerBits); + } + } + batch->numElements = orcOffset; + return Status::OK(); +} + +Status FillStructBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch, + int64_t& arrowOffset, int64_t& orcOffset, int64_t length, + Array* parray, std::vector<bool>* incomingMask) { + auto array = checked_cast<StructArray*>(parray); + auto batch = checked_cast<liborc::StructVectorBatch*>(cbatch); + std::shared_ptr<std::vector<bool>> outgoingMask; + std::size_t size = type->fields().size(); + int64_t arrowLength = array->length(); + if (!arrowLength) { + return Status::OK(); + } + int64_t initORCOffset = orcOffset; + int64_t initArrowOffset = arrowOffset; + // First fill fields of ColumnVectorBatch + if (array->null_count() || incomingMask) { + batch->hasNulls = true; + outgoingMask = std::make_shared<std::vector<bool>>(length, true); + } else { + outgoingMask = NULLPTR; + } + for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) { + if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) { + batch->notNull[orcOffset] = false; + (*outgoingMask)[orcOffset] = false; + } else { + batch->notNull[orcOffset] = true; + } + } + batch->numElements += orcOffset - initORCOffset; + // Fill the fields + for (std::size_t i = 0; i < size; i++) { + orcOffset = initORCOffset; + arrowOffset = initArrowOffset; + RETURN_NOT_OK(FillBatch(type->field(i)->type().get(), batch->fields[i], arrowOffset, + orcOffset, length, array->field(i).get(), + outgoingMask.get())); + } + return Status::OK(); +} + +template <class array_type> +Status FillListBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch, + int64_t& arrowOffset, int64_t& orcOffset, int64_t length, + Array* parray, std::vector<bool>* incomingMask) { + auto array = checked_cast<array_type*>(parray); + auto batch = checked_cast<liborc::ListVectorBatch*>(cbatch); + liborc::ColumnVectorBatch* elementBatch = (batch->elements).get(); + DataType* elementType = array->value_type().get(); + int64_t arrowLength = array->length(); + if (!arrowLength) { + return Status::OK(); + } + if (orcOffset == 0) { + batch->offsets[0] = 0; + } + if (array->null_count() || incomingMask) { + batch->hasNulls = true; + } + for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) { + if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) { + batch->notNull[orcOffset] = false; + batch->offsets[orcOffset + 1] = batch->offsets[orcOffset]; + } else { + batch->notNull[orcOffset] = true; + batch->offsets[orcOffset + 1] = batch->offsets[orcOffset] + + array->value_offset(arrowOffset + 1) - + array->value_offset(arrowOffset); + elementBatch->resize(batch->offsets[orcOffset + 1]); + int64_t subarrayArrowOffset = array->value_offset(arrowOffset), + subarrayORCOffset = batch->offsets[orcOffset], + subarrayORCLength = batch->offsets[orcOffset + 1]; + RETURN_NOT_OK(FillBatch(elementType, elementBatch, subarrayArrowOffset, + subarrayORCOffset, subarrayORCLength, array->values().get(), + NULLPTR)); + } + } + batch->numElements = orcOffset; + return Status::OK(); +} + +Status FillFixedSizeListBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch, + int64_t& arrowOffset, int64_t& orcOffset, int64_t length, + Array* parray, std::vector<bool>* incomingMask) { + auto array = checked_cast<FixedSizeListArray*>(parray); + auto batch = checked_cast<liborc::ListVectorBatch*>(cbatch); + liborc::ColumnVectorBatch* elementBatch = (batch->elements).get(); + DataType* elementType = array->value_type().get(); + int64_t arrowLength = array->length(); + int32_t elementLength = array->value_length(); // Fixed length of each subarray + if (!arrowLength) { + return Status::OK(); + } + if (orcOffset == 0) { + batch->offsets[0] = 0; + } + if (array->null_count() || incomingMask) { + batch->hasNulls = true; + } + for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) { + if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) { + batch->notNull[orcOffset] = false; + batch->offsets[orcOffset + 1] = batch->offsets[orcOffset]; + } else { + batch->notNull[orcOffset] = true; + batch->offsets[orcOffset + 1] = batch->offsets[orcOffset] + elementLength; + int64_t subarrayArrowOffset = array->value_offset(arrowOffset), + subarrayORCOffset = batch->offsets[orcOffset], + subarrayORCLength = batch->offsets[orcOffset + 1]; + elementBatch->resize(subarrayORCLength); + RETURN_NOT_OK(FillBatch(elementType, elementBatch, subarrayArrowOffset, + subarrayORCOffset, subarrayORCLength, array->values().get(), + NULLPTR)); + } + } + batch->numElements = orcOffset; + return Status::OK(); +} + +Status FillMapBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch, + int64_t& arrowOffset, int64_t& orcOffset, int64_t length, + Array* parray, std::vector<bool>* incomingMask) { + auto array = checked_cast<MapArray*>(parray); + auto batch = checked_cast<liborc::MapVectorBatch*>(cbatch); + liborc::ColumnVectorBatch* keyBatch = (batch->keys).get(); + liborc::ColumnVectorBatch* elementBatch = (batch->elements).get(); + Array* keyArray = array->keys().get(); + Array* elementArray = array->items().get(); + DataType* keyType = keyArray->type().get(); + DataType* elementType = elementArray->type().get(); + int64_t arrowLength = array->length(); + if (!arrowLength) { + return Status::OK(); + } + if (orcOffset == 0) { + batch->offsets[0] = 0; + } + if (array->null_count() || incomingMask) { + batch->hasNulls = true; + } + for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) { + if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) { + batch->notNull[orcOffset] = false; + batch->offsets[orcOffset + 1] = batch->offsets[orcOffset]; + } else { + batch->notNull[orcOffset] = true; + batch->offsets[orcOffset + 1] = batch->offsets[orcOffset] + + array->value_offset(arrowOffset + 1) - + array->value_offset(arrowOffset); + int64_t subarrayArrowOffset = array->value_offset(arrowOffset), + subarrayORCOffset = batch->offsets[orcOffset], + subarrayORCLength = batch->offsets[orcOffset + 1], + initSubarrayArrowOffset = subarrayArrowOffset, + initSubarrayORCOffset = subarrayORCOffset; + keyBatch->resize(subarrayORCLength); + elementBatch->resize(subarrayORCLength); + RETURN_NOT_OK(FillBatch(keyType, keyBatch, subarrayArrowOffset, subarrayORCOffset, + subarrayORCLength, keyArray, NULLPTR)); + subarrayArrowOffset = initSubarrayArrowOffset; + subarrayORCOffset = initSubarrayORCOffset; + RETURN_NOT_OK(FillBatch(elementType, elementBatch, subarrayArrowOffset, + subarrayORCOffset, subarrayORCLength, elementArray, + NULLPTR)); + } + } + batch->numElements = orcOffset; + return Status::OK(); +} + +Status FillBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch, + int64_t& arrowOffset, int64_t& orcOffset, int64_t length, Array* parray, + std::vector<bool>* incomingMask) { + Type::type kind = type->id(); + switch (kind) { + case Type::type::BOOL: + return FillNumericBatchCast<BooleanArray, liborc::LongVectorBatch, int64_t>( + type, cbatch, arrowOffset, orcOffset, length, parray, incomingMask); + case Type::type::INT8: + return FillNumericBatchCast<NumericArray<arrow::Int8Type>, liborc::LongVectorBatch, + int64_t>(type, cbatch, arrowOffset, orcOffset, length, + parray, incomingMask); + case Type::type::INT16: + return FillNumericBatchCast<NumericArray<arrow::Int16Type>, liborc::LongVectorBatch, + int64_t>(type, cbatch, arrowOffset, orcOffset, length, + parray, incomingMask); + case Type::type::INT32: + return FillNumericBatchCast<NumericArray<arrow::Int32Type>, liborc::LongVectorBatch, + int64_t>(type, cbatch, arrowOffset, orcOffset, length, + parray, incomingMask); + case Type::type::INT64: + return FillNumericBatch<NumericArray<arrow::Int64Type>, liborc::LongVectorBatch>( + type, cbatch, arrowOffset, orcOffset, length, parray, incomingMask); + case Type::type::FLOAT: + return FillNumericBatchCast<NumericArray<arrow::FloatType>, + liborc::DoubleVectorBatch, double>( + type, cbatch, arrowOffset, orcOffset, length, parray, incomingMask); + case Type::type::DOUBLE: + return FillNumericBatch<NumericArray<arrow::DoubleType>, liborc::DoubleVectorBatch>( + type, cbatch, arrowOffset, orcOffset, length, parray, incomingMask); + case Type::type::BINARY: + return FillBinaryBatch<BinaryArray, int32_t>(type, cbatch, arrowOffset, orcOffset, + length, parray, incomingMask); + case Type::type::LARGE_BINARY: + return FillBinaryBatch<LargeBinaryArray, int64_t>( + type, cbatch, arrowOffset, orcOffset, length, parray, incomingMask); + case Type::type::STRING: + return FillStringBatch<StringArray>(type, cbatch, arrowOffset, orcOffset, length, + parray, incomingMask); + case Type::type::LARGE_STRING: + return FillStringBatch<LargeStringArray>(type, cbatch, arrowOffset, orcOffset, + length, parray, incomingMask); + case Type::type::FIXED_SIZE_BINARY: + return FillFixedSizeBinaryBatch(type, cbatch, arrowOffset, orcOffset, length, + parray, incomingMask); + case Type::type::DATE32: + return FillNumericBatchCast<NumericArray<arrow::Date32Type>, + liborc::LongVectorBatch, int64_t>( + type, cbatch, arrowOffset, orcOffset, length, parray, incomingMask); + case Type::type::DATE64: + return FillDate64Batch(type, cbatch, arrowOffset, orcOffset, length, parray, + incomingMask); + case Type::type::TIMESTAMP: + return FillTimestampBatch(type, cbatch, arrowOffset, orcOffset, length, parray, + incomingMask); + case Type::type::DECIMAL: + return FillDecimalBatch(type, cbatch, arrowOffset, orcOffset, length, parray, + incomingMask); + case Type::type::STRUCT: + return FillStructBatch(type, cbatch, arrowOffset, orcOffset, length, parray, + incomingMask); + case Type::type::LIST: + return FillListBatch<ListArray>(type, cbatch, arrowOffset, orcOffset, length, + parray, incomingMask); + case Type::type::LARGE_LIST: + return FillListBatch<LargeListArray>(type, cbatch, arrowOffset, orcOffset, length, + parray, incomingMask); + case Type::type::FIXED_SIZE_LIST: + return FillFixedSizeListBatch(type, cbatch, arrowOffset, orcOffset, length, parray, + incomingMask); + case Type::type::MAP: + return FillMapBatch(type, cbatch, arrowOffset, orcOffset, length, parray, + incomingMask); + default: { + return Status::Invalid("Unknown or unsupported Arrow type kind: ", kind); + } + } + return Status::OK(); +} + +Status FillBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch, + int64_t& arrowIndexOffset, int& arrowChunkOffset, int64_t length, + ChunkedArray* pchunkedArray) { + int numBatch = pchunkedArray->num_chunks(); + int64_t orcOffset = 0; + Status st; + while (arrowChunkOffset < numBatch && orcOffset < length) { + st = FillBatch(type, cbatch, arrowIndexOffset, orcOffset, length, + pchunkedArray->chunk(arrowChunkOffset).get(), NULLPTR); + if (!st.ok()) { + return st; + } + if (arrowChunkOffset < numBatch && orcOffset < length) { + arrowIndexOffset = 0; + arrowChunkOffset++; + } + } + return Status::OK(); +} + Status GetArrowType(const liborc::Type* type, std::shared_ptr<DataType>* out) { - // When subselecting fields on read, liborc will set some nodes to nullptr, - // so we need to check for nullptr before progressing - if (type == nullptr) { + // When subselecting fields on read, liborc will set some nodes to NULLPTR, + // so we need to check for NULLPTR before progressing + if (type == NULLPTR) { Review comment: I have ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org