mathyingzhou commented on a change in pull request #8648:
URL: https://github.com/apache/arrow/pull/8648#discussion_r574939867



##########
File path: cpp/src/arrow/adapters/orc/adapter_util.cc
##########
@@ -315,13 +342,462 @@ Status AppendBatch(const liborc::Type* type, 
liborc::ColumnVectorBatch* batch,
       return Status::NotImplemented("Not implemented type kind: ", kind);
   }
 }
+}  // namespace orc
+}  // namespace adapters
+}  // namespace arrow
+
+namespace {
+
+using arrow::internal::checked_cast;
+
+arrow::Status WriteBatch(liborc::ColumnVectorBatch* column_vector_batch,
+                         int64_t* arrow_offset, int64_t* orc_offset,
+                         const int64_t& length, const arrow::Array& parray,
+                         const std::vector<bool>* incoming_mask = NULLPTR);
+
+// incoming_mask is exclusively used by FillStructBatch. The cause is that ORC 
is much
+// stricter than Arrow in terms of consistency. In this case if a struct 
scalar is null
+// all its children must be set to null or ORC is not going to function 
properly. This is
+// why I added incoming_mask to pass on null status from a struct to its 
children.
+//
+// static_cast from int64_t or double to itself shouldn't introduce overhead
+// Pleae see
+// https://stackoverflow.com/questions/19106826/
+// can-static-cast-to-same-type-introduce-runtime-overhead
+template <class ArrayType, class BatchType, class TargetType>
+arrow::Status WriteNumericBatch(liborc::ColumnVectorBatch* column_vector_batch,
+                                int64_t* arrow_offset, int64_t* orc_offset,
+                                const int64_t& length, const arrow::Array& 
array,
+                                const std::vector<bool>* incoming_mask) {
+  const ArrayType& numeric_array(checked_cast<const ArrayType&>(array));
+  auto batch = checked_cast<BatchType*>(column_vector_batch);
+  int64_t arrow_length = array.length();
+  if (!arrow_length) {
+    return arrow::Status::OK();
+  }
+  if (array.null_count() || incoming_mask) {
+    batch->hasNulls = true;
+  }
+  for (; *orc_offset < length && *arrow_offset < arrow_length;
+       (*orc_offset)++, (*arrow_offset)++) {
+    if (array.IsNull(*arrow_offset) ||
+        (incoming_mask && !(*incoming_mask)[*orc_offset])) {
+      batch->notNull[*orc_offset] = false;
+    } else {
+      batch->data[*orc_offset] =
+          static_cast<TargetType>(numeric_array.Value(*arrow_offset));
+      batch->notNull[*orc_offset] = true;
+    }
+  }
+  batch->numElements = *orc_offset;
+  return arrow::Status::OK();
+}
+
+template <class ArrayType>
+arrow::Status WriteTimestampBatch(liborc::ColumnVectorBatch* 
column_vector_batch,
+                                  int64_t* arrow_offset, int64_t* orc_offset,
+                                  const int64_t& length, const arrow::Array& 
array,
+                                  const std::vector<bool>* incoming_mask,
+                                  const int64_t& conversion_factor_from_second,
+                                  const int64_t& conversion_factor_to_nano) {
+  const ArrayType& timestamp_array(checked_cast<const ArrayType&>(array));
+  auto batch = 
checked_cast<liborc::TimestampVectorBatch*>(column_vector_batch);
+  int64_t arrow_length = array.length();
+  if (!arrow_length) {
+    return arrow::Status::OK();
+  }
+  if (array.null_count() || incoming_mask) {
+    batch->hasNulls = true;
+  }
+  for (; *orc_offset < length && *arrow_offset < arrow_length;
+       (*orc_offset)++, (*arrow_offset)++) {
+    if (array.IsNull(*arrow_offset) ||
+        (incoming_mask && !(*incoming_mask)[*orc_offset])) {
+      batch->notNull[*orc_offset] = false;
+    } else {
+      int64_t data = timestamp_array.Value(*arrow_offset);
+      batch->notNull[*orc_offset] = true;
+      batch->data[*orc_offset] =
+          static_cast<int64_t>(std::floor(data / 
conversion_factor_from_second));
+      batch->nanoseconds[*orc_offset] =
+          (data - conversion_factor_from_second * batch->data[*orc_offset]) *
+          conversion_factor_to_nano;
+    }
+  }
+  batch->numElements = *orc_offset;
+  return arrow::Status::OK();
+}
+
+template <class ArrayType, class OffsetType>
+arrow::Status WriteBinaryBatch(liborc::ColumnVectorBatch* column_vector_batch,
+                               int64_t* arrow_offset, int64_t* orc_offset,
+                               const int64_t& length, const arrow::Array& 
array,
+                               const std::vector<bool>* incoming_mask) {
+  const ArrayType& binary_array(checked_cast<const ArrayType&>(array));
+  auto batch = checked_cast<liborc::StringVectorBatch*>(column_vector_batch);
+  int64_t arrow_length = array.length();
+  if (!arrow_length) {
+    return arrow::Status::OK();
+  }
+  if (array.null_count() || incoming_mask) {
+    batch->hasNulls = true;
+  }
+  for (; *orc_offset < length && *arrow_offset < arrow_length;
+       (*orc_offset)++, (*arrow_offset)++) {
+    if (array.IsNull(*arrow_offset) ||
+        (incoming_mask && !(*incoming_mask)[*orc_offset])) {
+      batch->notNull[*orc_offset] = false;
+    } else {
+      batch->notNull[*orc_offset] = true;
+      OffsetType data_length = 0;
+      const uint8_t* data = binary_array.GetValue(*arrow_offset, &data_length);
+      if (batch->data[*orc_offset]) delete batch->data[*orc_offset];
+      batch->data[*orc_offset] = new char[data_length];  // Do not include null
+      memcpy(batch->data[*orc_offset], data, data_length);
+      batch->length[*orc_offset] = data_length;
+    }
+  }
+  batch->numElements = *orc_offset;
+  return arrow::Status::OK();
+}
+
+arrow::Status WriteFixedSizeBinaryBatch(liborc::ColumnVectorBatch* 
column_vector_batch,
+                                        int64_t* arrow_offset, int64_t* 
orc_offset,
+                                        const int64_t& length, const 
arrow::Array& array,
+                                        const std::vector<bool>* 
incoming_mask) {
+  const arrow::FixedSizeBinaryArray& fixed_size_binary_array(
+      checked_cast<const arrow::FixedSizeBinaryArray&>(array));
+  auto batch = checked_cast<liborc::StringVectorBatch*>(column_vector_batch);
+  int64_t arrow_length = array.length();
+  if (!arrow_length) {
+    return arrow::Status::OK();
+  }
+  const int32_t data_length = fixed_size_binary_array.byte_width();
+  if (array.null_count() || incoming_mask) {
+    batch->hasNulls = true;
+  }
+  for (; *orc_offset < length && *arrow_offset < arrow_length;
+       (*orc_offset)++, (*arrow_offset)++) {
+    if (array.IsNull(*arrow_offset) ||
+        (incoming_mask && !(*incoming_mask)[*orc_offset])) {
+      batch->notNull[*orc_offset] = false;
+    } else {
+      batch->notNull[*orc_offset] = true;
+      const uint8_t* data = fixed_size_binary_array.GetValue(*arrow_offset);
+      if (batch->data[*orc_offset]) delete batch->data[*orc_offset];
+      batch->data[*orc_offset] = new char[data_length];  // Do not include null
+      memcpy(batch->data[*orc_offset], data, data_length);
+      batch->length[*orc_offset] = data_length;
+    }
+  }
+  batch->numElements = *orc_offset;
+  return arrow::Status::OK();
+}
+
+// If Arrow supports 256-bit decimals we can not support it unless ORC does it
+arrow::Status WriteDecimalBatch(liborc::ColumnVectorBatch* column_vector_batch,
+                                int64_t* arrow_offset, int64_t* orc_offset,
+                                const int64_t& length, const arrow::Array& 
array,
+                                const std::vector<bool>* incoming_mask) {
+  const arrow::Decimal128Array& decimal128_array(
+      checked_cast<const arrow::Decimal128Array&>(array));
+  auto batch = 
checked_cast<liborc::Decimal128VectorBatch*>(column_vector_batch);
+  // Arrow uses 128 bits for decimal type and in the future, 256 bits will 
also be
+  // supported.
+  int64_t arrow_length = array.length();
+  if (!arrow_length) {
+    return arrow::Status::OK();
+  }
+  if (array.null_count() || incoming_mask) {
+    batch->hasNulls = true;
+  }
+  for (; *orc_offset < length && *arrow_offset < arrow_length;
+       (*orc_offset)++, (*arrow_offset)++) {
+    if (array.IsNull(*arrow_offset) ||
+        (incoming_mask && !(*incoming_mask)[*orc_offset])) {
+      batch->notNull[*orc_offset] = false;
+    } else {
+      batch->notNull[*orc_offset] = true;
+      uint8_t* raw_int128 =
+          const_cast<uint8_t*>(decimal128_array.GetValue(*arrow_offset));
+      uint64_t* lower_bits = reinterpret_cast<uint64_t*>(raw_int128);
+      int64_t* higher_bits = reinterpret_cast<int64_t*>(raw_int128 + 8);
+      batch->values[*orc_offset] = liborc::Int128(*higher_bits, *lower_bits);
+    }
+  }
+  batch->numElements = *orc_offset;
+  return arrow::Status::OK();
+}
+
+arrow::Status WriteStructBatch(liborc::ColumnVectorBatch* column_vector_batch,
+                               int64_t* arrow_offset, int64_t* orc_offset,
+                               const int64_t& length, const arrow::Array& 
array,
+                               const std::vector<bool>* incoming_mask) {
+  const arrow::StructArray& struct_array(checked_cast<const 
arrow::StructArray&>(array));
+  auto batch = checked_cast<liborc::StructVectorBatch*>(column_vector_batch);
+  std::shared_ptr<std::vector<bool>> outgoing_mask;
+  std::size_t size = array.type()->fields().size();
+  int64_t arrow_length = array.length();
+  if (!arrow_length) {
+    return arrow::Status::OK();
+  }
+  const int64_t init_orc_offset = *orc_offset;
+  const int64_t init_arrow_offset = *arrow_offset;
+  // First fill fields of ColumnVectorBatch
+  if (array.null_count() || incoming_mask) {
+    batch->hasNulls = true;
+    outgoing_mask = std::make_shared<std::vector<bool>>(length, true);
+  } else {
+    outgoing_mask = nullptr;
+  }
+  for (; *orc_offset < length && *arrow_offset < arrow_length;
+       (*orc_offset)++, (*arrow_offset)++) {
+    if (array.IsNull(*arrow_offset) ||
+        (incoming_mask && !(*incoming_mask)[*orc_offset])) {
+      batch->notNull[*orc_offset] = false;
+      (*outgoing_mask)[*orc_offset] = false;
+    } else {
+      batch->notNull[*orc_offset] = true;
+    }
+  }
+  batch->numElements += *orc_offset - init_orc_offset;
+  // Fill the fields
+  for (std::size_t i = 0; i < size; i++) {
+    *orc_offset = init_orc_offset;
+    *arrow_offset = init_arrow_offset;
+    RETURN_NOT_OK(WriteBatch(batch->fields[i], arrow_offset, orc_offset, 
length,
+                             *(struct_array.field(i)), outgoing_mask.get()));
+  }
+  return arrow::Status::OK();
+}
+
+template <class ArrayType>
+arrow::Status WriteListBatch(liborc::ColumnVectorBatch* column_vector_batch,
+                             int64_t* arrow_offset, int64_t* orc_offset,
+                             const int64_t& length, const arrow::Array& array,
+                             const std::vector<bool>* incoming_mask) {
+  const ArrayType& list_array(checked_cast<const ArrayType&>(array));
+  auto batch = checked_cast<liborc::ListVectorBatch*>(column_vector_batch);
+  liborc::ColumnVectorBatch* element_batch = (batch->elements).get();
+  int64_t arrow_length = array.length();
+  if (!arrow_length) {
+    return arrow::Status::OK();
+  }
+  if (*orc_offset == 0) {
+    batch->offsets[0] = 0;
+  }
+  if (array.null_count() || incoming_mask) {
+    batch->hasNulls = true;
+  }
+  for (; *orc_offset < length && *arrow_offset < arrow_length;
+       (*orc_offset)++, (*arrow_offset)++) {
+    if (array.IsNull(*arrow_offset) ||
+        (incoming_mask && !(*incoming_mask)[*orc_offset])) {
+      batch->notNull[*orc_offset] = false;
+      batch->offsets[*orc_offset + 1] = batch->offsets[*orc_offset];
+    } else {
+      batch->notNull[*orc_offset] = true;
+      batch->offsets[*orc_offset + 1] = batch->offsets[*orc_offset] +
+                                        list_array.value_offset(*arrow_offset 
+ 1) -
+                                        list_array.value_offset(*arrow_offset);
+      element_batch->resize(batch->offsets[*orc_offset + 1]);
+      int64_t subarray_arrow_offset = list_array.value_offset(*arrow_offset),
+              subarray_orc_offset = batch->offsets[*orc_offset],
+              subarray_orc_length = batch->offsets[*orc_offset + 1];
+      RETURN_NOT_OK(WriteBatch(element_batch, &subarray_arrow_offset,
+                               &subarray_orc_offset, subarray_orc_length,
+                               *(list_array.values()), nullptr));
+    }
+  }
+  batch->numElements = *orc_offset;
+  return arrow::Status::OK();
+}
+
+arrow::Status WriteMapBatch(liborc::ColumnVectorBatch* column_vector_batch,
+                            int64_t* arrow_offset, int64_t* orc_offset,
+                            const int64_t& length, const arrow::Array& array,
+                            const std::vector<bool>* incoming_mask) {
+  const arrow::MapArray& map_array(checked_cast<const 
arrow::MapArray&>(array));
+  auto batch = checked_cast<liborc::MapVectorBatch*>(column_vector_batch);
+  liborc::ColumnVectorBatch* key_batch = (batch->keys).get();
+  liborc::ColumnVectorBatch* element_batch = (batch->elements).get();
+  std::shared_ptr<arrow::Array> key_array = map_array.keys();
+  std::shared_ptr<arrow::Array> element_array = map_array.items();
+  int64_t arrow_length = array.length();
+  if (!arrow_length) {
+    return arrow::Status::OK();
+  }
+  if (*orc_offset == 0) {
+    batch->offsets[0] = 0;
+  }
+  if (array.null_count() || incoming_mask) {
+    batch->hasNulls = true;
+  }
+  for (; *orc_offset < length && *arrow_offset < arrow_length;
+       (*orc_offset)++, (*arrow_offset)++) {
+    if (array.IsNull(*arrow_offset) ||
+        (incoming_mask && !(*incoming_mask)[*orc_offset])) {
+      batch->notNull[*orc_offset] = false;
+      batch->offsets[*orc_offset + 1] = batch->offsets[*orc_offset];
+    } else {
+      batch->notNull[*orc_offset] = true;
+      batch->offsets[*orc_offset + 1] = batch->offsets[*orc_offset] +
+                                        map_array.value_offset(*arrow_offset + 
1) -
+                                        map_array.value_offset(*arrow_offset);
+      int64_t subarray_arrow_offset = map_array.value_offset(*arrow_offset),
+              subarray_orc_offset = batch->offsets[*orc_offset],
+              subarray_orc_length = batch->offsets[*orc_offset + 1],
+              init_subarray_arrow_offset = subarray_arrow_offset,
+              init_subarray_orc_offset = subarray_orc_offset;
+      key_batch->resize(subarray_orc_length);
+      element_batch->resize(subarray_orc_length);
+      RETURN_NOT_OK(WriteBatch(key_batch, &subarray_arrow_offset, 
&subarray_orc_offset,
+                               subarray_orc_length, *key_array, nullptr));
+      subarray_arrow_offset = init_subarray_arrow_offset;
+      subarray_orc_offset = init_subarray_orc_offset;
+      RETURN_NOT_OK(WriteBatch(element_batch, &subarray_arrow_offset,
+                               &subarray_orc_offset, subarray_orc_length, 
*element_array,
+                               nullptr));
+    }
+  }
+  batch->numElements = *orc_offset;
+  return arrow::Status::OK();
+}
+
+arrow::Status WriteBatch(liborc::ColumnVectorBatch* column_vector_batch,
+                         int64_t* arrow_offset, int64_t* orc_offset,
+                         const int64_t& length, const arrow::Array& array,
+                         const std::vector<bool>* incoming_mask) {
+  arrow::Type::type kind = array.type_id();
+  switch (kind) {
+    case arrow::Type::type::BOOL:
+      return WriteNumericBatch<arrow::BooleanArray, liborc::LongVectorBatch, 
int64_t>(
+          column_vector_batch, arrow_offset, orc_offset, length, array, 
incoming_mask);
+    case arrow::Type::type::INT8:
+      return WriteNumericBatch<arrow::NumericArray<arrow::Int8Type>,
+                               liborc::LongVectorBatch, int64_t>(
+          column_vector_batch, arrow_offset, orc_offset, length, array, 
incoming_mask);
+    case arrow::Type::type::INT16:
+      return WriteNumericBatch<arrow::NumericArray<arrow::Int16Type>,
+                               liborc::LongVectorBatch, int64_t>(
+          column_vector_batch, arrow_offset, orc_offset, length, array, 
incoming_mask);
+    case arrow::Type::type::INT32:
+      return WriteNumericBatch<arrow::NumericArray<arrow::Int32Type>,
+                               liborc::LongVectorBatch, int64_t>(
+          column_vector_batch, arrow_offset, orc_offset, length, array, 
incoming_mask);
+    case arrow::Type::type::INT64:
+      return WriteNumericBatch<arrow::NumericArray<arrow::Int64Type>,
+                               liborc::LongVectorBatch, int64_t>(
+          column_vector_batch, arrow_offset, orc_offset, length, array, 
incoming_mask);
+    case arrow::Type::type::FLOAT:
+      return WriteNumericBatch<arrow::NumericArray<arrow::FloatType>,
+                               liborc::DoubleVectorBatch, double>(
+          column_vector_batch, arrow_offset, orc_offset, length, array, 
incoming_mask);
+    case arrow::Type::type::DOUBLE:
+      return WriteNumericBatch<arrow::NumericArray<arrow::DoubleType>,
+                               liborc::DoubleVectorBatch, double>(
+          column_vector_batch, arrow_offset, orc_offset, length, array, 
incoming_mask);
+    case arrow::Type::type::BINARY:
+      return WriteBinaryBatch<arrow::BinaryArray, int32_t>(
+          column_vector_batch, arrow_offset, orc_offset, length, array, 
incoming_mask);
+    case arrow::Type::type::LARGE_BINARY:
+      return WriteBinaryBatch<arrow::LargeBinaryArray, int64_t>(
+          column_vector_batch, arrow_offset, orc_offset, length, array, 
incoming_mask);
+    case arrow::Type::type::STRING:
+      return WriteBinaryBatch<arrow::StringArray, int32_t>(
+          column_vector_batch, arrow_offset, orc_offset, length, array, 
incoming_mask);
+    case arrow::Type::type::LARGE_STRING:
+      return WriteBinaryBatch<arrow::LargeStringArray, int64_t>(
+          column_vector_batch, arrow_offset, orc_offset, length, array, 
incoming_mask);
+    case arrow::Type::type::FIXED_SIZE_BINARY:
+      return WriteFixedSizeBinaryBatch(column_vector_batch, arrow_offset, 
orc_offset,
+                                       length, array, incoming_mask);
+    case arrow::Type::type::DATE32:
+      return WriteNumericBatch<arrow::NumericArray<arrow::Date32Type>,
+                               liborc::LongVectorBatch, int64_t>(
+          column_vector_batch, arrow_offset, orc_offset, length, array, 
incoming_mask);
+    case arrow::Type::type::DATE64:
+      return WriteTimestampBatch<arrow::Date64Array>(
+          column_vector_batch, arrow_offset, orc_offset, length, array, 
incoming_mask,
+          kOneSecondMillis, kOneMilliNanos);
+    case arrow::Type::type::TIMESTAMP: {
+      switch 
(arrow::internal::checked_pointer_cast<arrow::TimestampType>(array.type())
+                  ->unit()) {
+        case arrow::TimeUnit::type::SECOND:
+          return WriteTimestampBatch<arrow::TimestampArray>(
+              column_vector_batch, arrow_offset, orc_offset, length, array, 
incoming_mask,
+              1, kOneSecondNanos);
+        case arrow::TimeUnit::type::MILLI:
+          return WriteTimestampBatch<arrow::TimestampArray>(
+              column_vector_batch, arrow_offset, orc_offset, length, array, 
incoming_mask,
+              kOneSecondMillis, kOneMilliNanos);
+        case arrow::TimeUnit::type::MICRO:
+          return WriteTimestampBatch<arrow::TimestampArray>(
+              column_vector_batch, arrow_offset, orc_offset, length, array, 
incoming_mask,
+              kOneSecondMicros, kOneMicroNanos);
+        default:
+          return WriteTimestampBatch<arrow::TimestampArray>(
+              column_vector_batch, arrow_offset, orc_offset, length, array, 
incoming_mask,
+              kOneSecondNanos, 1);
+      }
+    }
+    case arrow::Type::type::DECIMAL:
+      return WriteDecimalBatch(column_vector_batch, arrow_offset, orc_offset, 
length,
+                               array, incoming_mask);
+    case arrow::Type::type::STRUCT:
+      return WriteStructBatch(column_vector_batch, arrow_offset, orc_offset, 
length,
+                              array, incoming_mask);
+    case arrow::Type::type::LIST:
+      return WriteListBatch<arrow::ListArray>(column_vector_batch, 
arrow_offset,
+                                              orc_offset, length, array, 
incoming_mask);
+    case arrow::Type::type::LARGE_LIST:
+      return WriteListBatch<arrow::LargeListArray>(
+          column_vector_batch, arrow_offset, orc_offset, length, array, 
incoming_mask);
+    case arrow::Type::type::FIXED_SIZE_LIST:
+      return WriteListBatch<arrow::FixedSizeListArray>(
+          column_vector_batch, arrow_offset, orc_offset, length, array, 
incoming_mask);
+    case arrow::Type::type::MAP:
+      return WriteMapBatch(column_vector_batch, arrow_offset, orc_offset, 
length, array,
+                           incoming_mask);
+    default: {
+      return arrow::Status::Invalid("Unknown or unsupported Arrow type kind: 
", kind);

Review comment:
       Thanks! Fixed!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to