mathyingzhou commented on a change in pull request #8648:
URL: https://github.com/apache/arrow/pull/8648#discussion_r561405645



##########
File path: cpp/src/arrow/adapters/orc/adapter_test.cc
##########
@@ -157,4 +225,2478 @@ TEST(TestAdapter, readIntAndStringFileMultipleStripes) {
     EXPECT_TRUE(stripe_reader->ReadNext(&record_batch).ok());
   }
 }
+
+// WriteORC tests
+
+// General
+TEST(TestAdapterWriteGeneral, writeZeroRows) {
+  std::vector<std::shared_ptr<Field>> xFields{field("bool", boolean()),
+                                              field("int8", int8()),
+                                              field("int16", int16()),
+                                              field("int32", int32()),
+                                              field("int64", int64()),
+                                              field("float", float32()),
+                                              field("double", float64()),
+                                              field("decimal128nz", 
decimal(25, 6)),
+                                              field("decimal128z", decimal(32, 
0)),
+                                              field("date32", date32()),
+                                              field("ts3", 
timestamp(TimeUnit::NANO)),
+                                              field("string", utf8()),
+                                              field("binary", binary())};
+  std::shared_ptr<Schema> sharedPtrSchema = std::make_shared<Schema>(xFields);
+
+  int64_t numRows = 0;
+  int64_t numCols = xFields.size();
+
+  ArrayBuilderVector builders(numCols, NULLPTR);
+  builders[0] =
+      
std::static_pointer_cast<ArrayBuilder>(std::make_shared<BooleanBuilder>());
+  builders[1] = 
std::static_pointer_cast<ArrayBuilder>(std::make_shared<Int8Builder>());
+  builders[2] = 
std::static_pointer_cast<ArrayBuilder>(std::make_shared<Int16Builder>());
+  builders[3] = 
std::static_pointer_cast<ArrayBuilder>(std::make_shared<Int32Builder>());
+  builders[4] = 
std::static_pointer_cast<ArrayBuilder>(std::make_shared<Int64Builder>());
+  builders[5] = 
std::static_pointer_cast<ArrayBuilder>(std::make_shared<FloatBuilder>());
+  builders[6] = 
std::static_pointer_cast<ArrayBuilder>(std::make_shared<DoubleBuilder>());
+  builders[7] = std::static_pointer_cast<ArrayBuilder>(
+      std::make_shared<Decimal128Builder>(decimal(25, 6)));
+  builders[8] = std::static_pointer_cast<ArrayBuilder>(
+      std::make_shared<Decimal128Builder>(decimal(32, 0)));
+  builders[9] = 
std::static_pointer_cast<ArrayBuilder>(std::make_shared<Date32Builder>());
+  builders[10] =
+      
std::static_pointer_cast<ArrayBuilder>(std::make_shared<TimestampBuilder>(
+          timestamp(TimeUnit::NANO), default_memory_pool()));
+  builders[11] =
+      
std::static_pointer_cast<ArrayBuilder>(std::make_shared<StringBuilder>());
+  builders[12] =
+      
std::static_pointer_cast<ArrayBuilder>(std::make_shared<BinaryBuilder>());
+  ArrayVector arrays(numCols, NULLPTR);
+  ChunkedArrayVector cv;
+  cv.reserve(numCols);
+
+  for (int col = 0; col < numCols; col++) {
+    ARROW_EXPECT_OK(builders[col]->Finish(&arrays[col]));
+    cv.push_back(std::make_shared<ChunkedArray>(arrays[col]));
+  }
+
+  std::shared_ptr<Table> table = Table::Make(sharedPtrSchema, cv);
+
+  std::unique_ptr<ORCMemWriter> writer =
+      std::unique_ptr<ORCMemWriter>(new ORCMemWriter());
+  std::unique_ptr<liborc::OutputStream> out_stream =
+      std::unique_ptr<liborc::OutputStream>(static_cast<liborc::OutputStream*>(
+          new MemoryOutputStream(DEFAULT_SMALL_MEM_STREAM_SIZE / 16)));
+  ARROW_EXPECT_OK(writer->Open(sharedPtrSchema, out_stream));
+  ARROW_EXPECT_OK(writer->Write(table));
+  auto output_mem_stream = 
static_cast<MemoryOutputStream*>(writer->ReleaseOutStream());
+  std::shared_ptr<io::RandomAccessFile> in_stream(
+      new io::BufferReader(std::make_shared<Buffer>(
+          reinterpret_cast<const uint8_t*>(output_mem_stream->getData()),
+          static_cast<int64_t>(output_mem_stream->getLength()))));
+
+  std::unique_ptr<adapters::orc::ORCFileReader> reader;
+  ASSERT_TRUE(
+      adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool(), 
&reader).ok());
+  std::shared_ptr<Table> outputTable;
+  ARROW_EXPECT_OK(reader->Read(&outputTable));
+  EXPECT_EQ(outputTable->num_columns(), numCols);
+  EXPECT_EQ(outputTable->num_rows(), numRows);
+  EXPECT_TRUE(outputTable->Equals(*table));
+}
+TEST(TestAdapterWriteGeneral, writeChunkless) {
+  std::vector<std::shared_ptr<Field>> xFieldsSub{std::make_shared<Field>("a", 
utf8()),
+                                                 std::make_shared<Field>("b", 
int32())};
+  std::vector<std::shared_ptr<Field>> xFields{
+      field("bool", boolean()),
+      field("int8", int8()),
+      field("int16", int16()),
+      field("int32", int32()),
+      field("int64", int64()),
+      field("float", float32()),
+      field("double", float64()),
+      field("decimal128nz", decimal(25, 6)),
+      field("decimal128z", decimal(32, 0)),
+      field("date32", date32()),
+      field("ts3", timestamp(TimeUnit::NANO)),
+      field("string", utf8()),
+      field("binary", binary()),
+      field("struct", struct_(xFieldsSub)),
+      field("list", list(int32())),
+      field("lsl", list(struct_({field("lsl0", list(int32()))})))};
+  std::shared_ptr<Schema> sharedPtrSchema = std::make_shared<Schema>(xFields);
+
+  int64_t numRows = 0;
+  int64_t numCols = xFields.size();
+
+  ChunkedArrayVector cv;
+  cv.reserve(numCols);
+
+  ArrayMatrix av(numCols, ArrayVector(0, NULLPTR));
+
+  for (int col = 0; col < numCols; col++) {
+    cv.push_back(std::make_shared<ChunkedArray>(av[col], 
xFields[col]->type()));
+  }
+
+  std::shared_ptr<Table> table = Table::Make(sharedPtrSchema, cv);
+
+  MemoryOutputStream mem_stream(DEFAULT_SMALL_MEM_STREAM_SIZE);
+  std::unique_ptr<ORCMemWriter> writer =
+      std::unique_ptr<ORCMemWriter>(new ORCMemWriter());
+  std::unique_ptr<liborc::OutputStream> out_stream =
+      std::unique_ptr<liborc::OutputStream>(static_cast<liborc::OutputStream*>(
+          new MemoryOutputStream(DEFAULT_SMALL_MEM_STREAM_SIZE / 16)));
+  ARROW_EXPECT_OK(writer->Open(sharedPtrSchema, out_stream));
+  ARROW_EXPECT_OK(writer->Write(table));
+  auto output_mem_stream = 
static_cast<MemoryOutputStream*>(writer->ReleaseOutStream());
+  std::shared_ptr<io::RandomAccessFile> in_stream(
+      new io::BufferReader(std::make_shared<Buffer>(
+          reinterpret_cast<const uint8_t*>(output_mem_stream->getData()),
+          static_cast<int64_t>(output_mem_stream->getLength()))));
+
+  std::unique_ptr<adapters::orc::ORCFileReader> reader;
+  ASSERT_TRUE(
+      adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool(), 
&reader).ok());
+  std::shared_ptr<Table> outputTable;
+  ARROW_EXPECT_OK(reader->Read(&outputTable));
+  EXPECT_EQ(outputTable->num_columns(), numCols);
+  EXPECT_EQ(outputTable->num_rows(), numRows);
+  EXPECT_TRUE(outputTable->Equals(*table));
+}
+TEST(TestAdapterWriteGeneral, writeAllNulls) {
+  std::vector<std::shared_ptr<Field>> xFields{field("bool", boolean()),
+                                              field("int8", int8()),
+                                              field("int16", int16()),
+                                              field("int32", int32()),
+                                              field("int64", int64()),
+                                              field("decimal128nz", 
decimal(33, 4)),
+                                              field("decimal128z", decimal(35, 
0)),
+                                              field("date32", date32()),
+                                              field("ts3", 
timestamp(TimeUnit::NANO)),
+                                              field("string", utf8()),
+                                              field("binary", binary())};
+  std::shared_ptr<Schema> sharedPtrSchema = std::make_shared<Schema>(xFields);
+
+  int64_t numRows = 10000;
+  int64_t numCols = xFields.size();
+
+  ArrayBuilderMatrix builders(numCols, ArrayBuilderVector(5, NULLPTR));
+
+  for (int i = 0; i < 5; i++) {
+    builders[0][i] =
+        
std::static_pointer_cast<ArrayBuilder>(std::make_shared<BooleanBuilder>());
+    builders[1][i] =
+        
std::static_pointer_cast<ArrayBuilder>(std::make_shared<Int8Builder>());
+    builders[2][i] =
+        
std::static_pointer_cast<ArrayBuilder>(std::make_shared<Int16Builder>());
+    builders[3][i] =
+        
std::static_pointer_cast<ArrayBuilder>(std::make_shared<Int32Builder>());
+    builders[4][i] =
+        
std::static_pointer_cast<ArrayBuilder>(std::make_shared<Int64Builder>());
+    builders[5][i] = std::static_pointer_cast<ArrayBuilder>(
+        std::make_shared<Decimal128Builder>(decimal(33, 4)));
+    builders[6][i] = std::static_pointer_cast<ArrayBuilder>(
+        std::make_shared<Decimal128Builder>(decimal(35, 0)));
+    builders[7][i] =
+        
std::static_pointer_cast<ArrayBuilder>(std::make_shared<Date32Builder>());
+    builders[8][i] =
+        
std::static_pointer_cast<ArrayBuilder>(std::make_shared<TimestampBuilder>(
+            timestamp(TimeUnit::NANO), default_memory_pool()));
+    builders[9][i] =
+        
std::static_pointer_cast<ArrayBuilder>(std::make_shared<StringBuilder>());
+    builders[10][i] =
+        
std::static_pointer_cast<ArrayBuilder>(std::make_shared<BinaryBuilder>());
+  }
+
+  for (int i = 0; i < numRows; i++) {
+    int chunk = i < (numRows / 2) ? 1 : 3;
+    for (int col = 0; col < numCols; col++) {
+      ARROW_EXPECT_OK(builders[col][chunk]->AppendNull());
+    }
+  }
+
+  ArrayMatrix arrays(numCols, ArrayVector(5, NULLPTR));
+  ChunkedArrayVector cv;
+  cv.reserve(numCols);
+
+  for (int col = 0; col < numCols; col++) {
+    for (int i = 0; i < 5; i++) {
+      ARROW_EXPECT_OK(builders[col][i]->Finish(&arrays[col][i]));
+    }
+    cv.push_back(std::make_shared<ChunkedArray>(arrays[col]));
+  }
+
+  std::shared_ptr<Table> table = Table::Make(sharedPtrSchema, cv);

Review comment:
       > I didn't see anything rust related in this PR so I removed the Rust 
label
   
   @alamb Thanks! There isn’t anything Parquet-related either. Can that be 
removed as well?

##########
File path: cpp/src/arrow/adapters/orc/adapter_util.cc
##########
@@ -40,15 +44,21 @@ namespace orc {
 
 using internal::checked_cast;
 
-// The number of nanoseconds in a second
+// The number of milliseconds, microseconds and nanoseconds in a second
+constexpr int64_t kOneSecondMillis = 1000LL;
+constexpr int64_t kOneMicroNanos = 1000LL;
+constexpr int64_t kOneSecondMicros = 1000000LL;
+constexpr int64_t kOneMilliNanos = 1000000LL;
 constexpr int64_t kOneSecondNanos = 1000000000LL;
+// Jan 1st 2015 in UNIX timestamp
+// constexpr int64_t kConverter = 1420070400LL;
 
 Status AppendStructBatch(const liborc::Type* type, liborc::ColumnVectorBatch* 
cbatch,
                          int64_t offset, int64_t length, ArrayBuilder* 
abuilder) {
   auto builder = checked_cast<StructBuilder*>(abuilder);
   auto batch = checked_cast<liborc::StructVectorBatch*>(cbatch);
 
-  const uint8_t* valid_bytes = nullptr;
+  const uint8_t* valid_bytes = NULLPTR;

Review comment:
       Thanks! That's in the ORC reader though. I can file a different PR to 
fix the reader (and add Union support to it).

##########
File path: cpp/src/arrow/adapters/orc/adapter_util.cc
##########
@@ -40,15 +44,21 @@ namespace orc {
 
 using internal::checked_cast;
 
-// The number of nanoseconds in a second
+// The number of milliseconds, microseconds and nanoseconds in a second
+constexpr int64_t kOneSecondMillis = 1000LL;
+constexpr int64_t kOneMicroNanos = 1000LL;
+constexpr int64_t kOneSecondMicros = 1000000LL;
+constexpr int64_t kOneMilliNanos = 1000000LL;
 constexpr int64_t kOneSecondNanos = 1000000000LL;
+// Jan 1st 2015 in UNIX timestamp
+// constexpr int64_t kConverter = 1420070400LL;
 
 Status AppendStructBatch(const liborc::Type* type, liborc::ColumnVectorBatch* 
cbatch,
                          int64_t offset, int64_t length, ArrayBuilder* 
abuilder) {
   auto builder = checked_cast<StructBuilder*>(abuilder);
   auto batch = checked_cast<liborc::StructVectorBatch*>(cbatch);
 
-  const uint8_t* valid_bytes = nullptr;
+  const uint8_t* valid_bytes = NULLPTR;

Review comment:
       Thanks! This one is in the ORC reader though. I can file a different PR 
to fix the reader (and add Union support to it).

##########
File path: cpp/src/arrow/adapters/orc/adapter_util.cc
##########
@@ -316,10 +326,482 @@ Status AppendBatch(const liborc::Type* type, 
liborc::ColumnVectorBatch* batch,
   }
 }
 
+template <class array_type, class batch_type>
+Status FillNumericBatch(const DataType* type, liborc::ColumnVectorBatch* 
cbatch,
+                        int64_t& arrowOffset, int64_t& orcOffset, int64_t 
length,
+                        Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<array_type*>(parray);
+  auto batch = checked_cast<batch_type*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;

Review comment:
       Thanks! I will fix all of them.

##########
File path: cpp/src/arrow/adapters/orc/adapter_util.cc
##########
@@ -316,10 +326,482 @@ Status AppendBatch(const liborc::Type* type, 
liborc::ColumnVectorBatch* batch,
   }
 }
 
+template <class array_type, class batch_type>
+Status FillNumericBatch(const DataType* type, liborc::ColumnVectorBatch* 
cbatch,
+                        int64_t& arrowOffset, int64_t& orcOffset, int64_t 
length,
+                        Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<array_type*>(parray);
+  auto batch = checked_cast<batch_type*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, 
arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && 
!(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      batch->data[orcOffset] = array->Value(arrowOffset);
+      batch->notNull[orcOffset] = true;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+template <class array_type, class batch_type, class target_type>
+Status FillNumericBatchCast(const DataType* type, liborc::ColumnVectorBatch* 
cbatch,
+                            int64_t& arrowOffset, int64_t& orcOffset, int64_t 
length,
+                            Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<array_type*>(parray);
+  auto batch = checked_cast<batch_type*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, 
arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && 
!(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      batch->data[orcOffset] = 
static_cast<target_type>(array->Value(arrowOffset));
+      batch->notNull[orcOffset] = true;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillDate64Batch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                       int64_t& arrowOffset, int64_t& orcOffset, int64_t 
length,
+                       Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<Date64Array*>(parray);
+  auto batch = checked_cast<liborc::TimestampVectorBatch*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, 
arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && 
!(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      int64_t miliseconds = array->Value(arrowOffset);
+      batch->data[orcOffset] =
+          static_cast<int64_t>(std::floor(miliseconds / kOneSecondMillis));
+      batch->nanoseconds[orcOffset] =
+          (miliseconds - kOneSecondMillis * batch->data[orcOffset]) * 
kOneMilliNanos;
+      batch->notNull[orcOffset] = true;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillTimestampBatch(const DataType* type, liborc::ColumnVectorBatch* 
cbatch,
+                          int64_t& arrowOffset, int64_t& orcOffset, int64_t 
length,
+                          Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<TimestampArray*>(parray);
+  auto batch = checked_cast<liborc::TimestampVectorBatch*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, 
arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && 
!(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      int64_t data = array->Value(arrowOffset);
+      batch->notNull[orcOffset] = true;
+      switch (std::static_pointer_cast<TimestampType>(array->type())->unit()) {
+        case TimeUnit::type::SECOND: {
+          batch->data[orcOffset] = data;
+          batch->nanoseconds[orcOffset] = 0;
+          break;
+        }
+        case TimeUnit::type::MILLI: {
+          batch->data[orcOffset] =
+              static_cast<int64_t>(std::floor(data / kOneSecondMillis));
+          batch->nanoseconds[orcOffset] =
+              (data - kOneSecondMillis * batch->data[orcOffset]) * 
kOneMilliNanos;
+          break;
+        }
+        case TimeUnit::type::MICRO: {
+          batch->data[orcOffset] =
+              static_cast<int64_t>(std::floor(data / kOneSecondMicros));
+          batch->nanoseconds[orcOffset] =
+              (data - kOneSecondMicros * batch->data[orcOffset]) * 
kOneMicroNanos;
+          break;
+        }
+        default: {
+          batch->data[orcOffset] =
+              static_cast<int64_t>(std::floor(data / kOneSecondNanos));
+          batch->nanoseconds[orcOffset] = data - kOneSecondNanos * 
batch->data[orcOffset];
+        }
+      }
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+template <class array_type>
+Status FillStringBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                       int64_t& arrowOffset, int64_t& orcOffset, int64_t 
length,
+                       Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<array_type*>(parray);
+  auto batch = checked_cast<liborc::StringVectorBatch*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, 
arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && 
!(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      batch->notNull[orcOffset] = true;
+      std::string dataString = array->GetString(arrowOffset);
+      int dataStringLength = dataString.length();
+      if (batch->data[orcOffset]) delete batch->data[orcOffset];
+      batch->data[orcOffset] = new char[dataStringLength + 1];  // Include null
+      memcpy(batch->data[orcOffset], dataString.c_str(), dataStringLength + 1);
+      batch->length[orcOffset] = dataStringLength;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+template <class array_type, class offset_type>
+Status FillBinaryBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                       int64_t& arrowOffset, int64_t& orcOffset, int64_t 
length,
+                       Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<array_type*>(parray);
+  auto batch = checked_cast<liborc::StringVectorBatch*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, 
arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && 
!(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      batch->notNull[orcOffset] = true;
+      offset_type dataLength = 0;
+      const uint8_t* data = array->GetValue(arrowOffset, &dataLength);
+      if (batch->data[orcOffset]) delete batch->data[orcOffset];
+      batch->data[orcOffset] = new char[dataLength];  // Do not include null
+      memcpy(batch->data[orcOffset], data, dataLength);
+      batch->length[orcOffset] = dataLength;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillFixedSizeBinaryBatch(const DataType* type, 
liborc::ColumnVectorBatch* cbatch,
+                                int64_t& arrowOffset, int64_t& orcOffset, 
int64_t length,
+                                Array* parray, std::vector<bool>* 
incomingMask) {
+  auto array = checked_cast<FixedSizeBinaryArray*>(parray);
+  auto batch = checked_cast<liborc::StringVectorBatch*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  int32_t byteWidth = array->byte_width();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, 
arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && 
!(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      batch->notNull[orcOffset] = true;
+      const uint8_t* data = array->GetValue(arrowOffset);
+      if (batch->data[orcOffset]) delete batch->data[orcOffset];
+      batch->data[orcOffset] = new char[byteWidth];  // Do not include null
+      memcpy(batch->data[orcOffset], data, byteWidth);
+      batch->length[orcOffset] = byteWidth;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+// If Arrow supports 256-bit decimals we can not support it unless ORC does it
+Status FillDecimalBatch(const DataType* type, liborc::ColumnVectorBatch* 
cbatch,
+                        int64_t& arrowOffset, int64_t& orcOffset, int64_t 
length,
+                        Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<Decimal128Array*>(parray);
+  auto batch = checked_cast<liborc::Decimal128VectorBatch*>(cbatch);
+  // Arrow uses 128 bits for decimal type and in the future, 256 bits will 
also be
+  // supported.
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, 
arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && 
!(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      batch->notNull[orcOffset] = true;
+      uint8_t* rawInt128 = const_cast<uint8_t*>(array->GetValue(arrowOffset));
+      uint64_t* lowerBits = reinterpret_cast<uint64_t*>(rawInt128);
+      int64_t* higherBits = reinterpret_cast<int64_t*>(rawInt128 + 8);
+      batch->values[orcOffset] = liborc::Int128(*higherBits, *lowerBits);
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillStructBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                       int64_t& arrowOffset, int64_t& orcOffset, int64_t 
length,
+                       Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<StructArray*>(parray);
+  auto batch = checked_cast<liborc::StructVectorBatch*>(cbatch);
+  std::shared_ptr<std::vector<bool>> outgoingMask;
+  std::size_t size = type->fields().size();
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  int64_t initORCOffset = orcOffset;
+  int64_t initArrowOffset = arrowOffset;
+  // First fill fields of ColumnVectorBatch
+  if (array->null_count() || incomingMask) {
+    batch->hasNulls = true;
+    outgoingMask = std::make_shared<std::vector<bool>>(length, true);
+  } else {
+    outgoingMask = NULLPTR;
+  }
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, 
arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && 
!(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+      (*outgoingMask)[orcOffset] = false;
+    } else {
+      batch->notNull[orcOffset] = true;
+    }
+  }
+  batch->numElements += orcOffset - initORCOffset;
+  // Fill the fields
+  for (std::size_t i = 0; i < size; i++) {
+    orcOffset = initORCOffset;
+    arrowOffset = initArrowOffset;
+    RETURN_NOT_OK(FillBatch(type->field(i)->type().get(), batch->fields[i], 
arrowOffset,
+                            orcOffset, length, array->field(i).get(),
+                            outgoingMask.get()));
+  }
+  return Status::OK();
+}
+
+template <class array_type>
+Status FillListBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                     int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                     Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<array_type*>(parray);
+  auto batch = checked_cast<liborc::ListVectorBatch*>(cbatch);
+  auto elementBatch = (batch->elements).get();
+  DataType* elementType = array->value_type().get();
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (orcOffset == 0) batch->offsets[0] = 0;
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, 
arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && 
!(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+      batch->offsets[orcOffset + 1] = batch->offsets[orcOffset];
+    } else {
+      batch->notNull[orcOffset] = true;
+      batch->offsets[orcOffset + 1] = batch->offsets[orcOffset] +
+                                      array->value_offset(arrowOffset + 1) -
+                                      array->value_offset(arrowOffset);
+      elementBatch->resize(batch->offsets[orcOffset + 1]);
+      int64_t subarrayArrowOffset = array->value_offset(arrowOffset),
+              subarrayORCOffset = batch->offsets[orcOffset],
+              subarrayORCLength = batch->offsets[orcOffset + 1];
+      RETURN_NOT_OK(FillBatch(elementType, elementBatch, subarrayArrowOffset,
+                              subarrayORCOffset, subarrayORCLength, 
array->values().get(),
+                              NULLPTR));
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillFixedSizeListBatch(const DataType* type, liborc::ColumnVectorBatch* 
cbatch,
+                              int64_t& arrowOffset, int64_t& orcOffset, 
int64_t length,
+                              Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<FixedSizeListArray*>(parray);
+  auto batch = checked_cast<liborc::ListVectorBatch*>(cbatch);
+  auto elementBatch = (batch->elements).get();
+  DataType* elementType = array->value_type().get();
+  int64_t arrowLength = array->length();
+  int32_t elementLength = array->value_length();  // Fixed length of each 
subarray
+  if (!arrowLength) return Status::OK();
+  if (orcOffset == 0) batch->offsets[0] = 0;
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, 
arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && 
!(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+      batch->offsets[orcOffset + 1] = batch->offsets[orcOffset];
+    } else {
+      batch->notNull[orcOffset] = true;
+      batch->offsets[orcOffset + 1] = batch->offsets[orcOffset] + 
elementLength;
+      int64_t subarrayArrowOffset = array->value_offset(arrowOffset),
+              subarrayORCOffset = batch->offsets[orcOffset],
+              subarrayORCLength = batch->offsets[orcOffset + 1];
+      elementBatch->resize(subarrayORCLength);
+      RETURN_NOT_OK(FillBatch(elementType, elementBatch, subarrayArrowOffset,
+                              subarrayORCOffset, subarrayORCLength, 
array->values().get(),
+                              NULLPTR));
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillMapBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                    int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                    Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<MapArray*>(parray);
+  auto batch = checked_cast<liborc::MapVectorBatch*>(cbatch);
+  auto keyBatch = (batch->keys).get();

Review comment:
       Sure! 

##########
File path: cpp/src/arrow/adapters/orc/adapter.cc
##########
@@ -473,6 +473,105 @@ int64_t ORCFileReader::NumberOfStripes() { return 
impl_->NumberOfStripes(); }
 
 int64_t ORCFileReader::NumberOfRows() { return impl_->NumberOfRows(); }
 
+class ArrowOutputStream : public liborc::OutputStream {
+ public:
+  explicit ArrowOutputStream(const std::shared_ptr<io::OutputStream>& 
output_stream)
+      : output_stream_(output_stream), length_(0) {}
+
+  uint64_t getLength() const override { return length_; }
+
+  uint64_t getNaturalWriteSize() const override { return 
ORC_NATURAL_WRITE_SIZE; }
+
+  void write(const void* buf, size_t length) override {
+    ORC_THROW_NOT_OK(output_stream_->Write(buf, static_cast<int64_t>(length)));
+    length_ += static_cast<int64_t>(length);
+  }
+
+  const std::string& getName() const override {
+    static const std::string filename("ArrowOutputFile");
+    return filename;
+  }
+
+  void close() override {
+    if (!output_stream_->closed()) {
+      ORC_THROW_NOT_OK(output_stream_->Close());
+    }
+  }
+
+  void set_length(int64_t length) { length_ = length; }
+
+ private:
+  std::shared_ptr<io::OutputStream> output_stream_;
+  int64_t length_;
+};
+
+class ORCFileWriter::Impl {
+ public:
+  Status Open(const std::shared_ptr<Schema>& schema,
+              const std::shared_ptr<io::OutputStream>& output_stream) {
+    orc_options_ = std::unique_ptr<liborc::WriterOptions>(new 
liborc::WriterOptions());
+    outStream_ = ORC_UNIQUE_PTR<liborc::OutputStream>(
+        static_cast<liborc::OutputStream*>(new 
ArrowOutputStream(output_stream)));
+    ORC_THROW_NOT_OK(GetORCType(*schema, &orcSchema_));
+    try {
+      writer_ = createWriter(*orcSchema_, outStream_.get(), *orc_options_);
+    } catch (const liborc::ParseError& e) {
+      return Status::IOError(e.what());
+    }
+    schema_ = schema;
+    num_cols_ = schema->num_fields();
+    return Status::OK();
+  }
+  Status Write(const std::shared_ptr<Table> table) {
+    int64_t numRows = table->num_rows();
+    int64_t batch_size = 1024;  // Doesn't matter what it is
+    std::vector<int64_t> arrowIndexOffset(num_cols_, 0);
+    std::vector<int> arrowChunkOffset(num_cols_, 0);
+    ORC_UNIQUE_PTR<liborc::ColumnVectorBatch> batch = 
writer_->createRowBatch(batch_size);
+    liborc::StructVectorBatch* root =
+        internal::checked_cast<liborc::StructVectorBatch*>(batch.get());
+    std::vector<liborc::ColumnVectorBatch*> fields = root->fields;
+    while (numRows > 0) {
+      for (int i = 0; i < num_cols_; i++) {
+        ORC_THROW_NOT_OK(adapters::orc::FillBatch(
+            schema_->field(i)->type().get(), fields[i], arrowIndexOffset[i],
+            arrowChunkOffset[i], batch_size, table->column(i).get()));
+      }
+      root->numElements = fields[0]->numElements;

Review comment:
       It is. If the root->numElements is not set it is 0 and nothing gets 
written.

##########
File path: cpp/src/arrow/adapters/orc/adapter.cc
##########
@@ -473,6 +473,105 @@ int64_t ORCFileReader::NumberOfStripes() { return 
impl_->NumberOfStripes(); }
 
 int64_t ORCFileReader::NumberOfRows() { return impl_->NumberOfRows(); }
 
+class ArrowOutputStream : public liborc::OutputStream {
+ public:
+  explicit ArrowOutputStream(const std::shared_ptr<io::OutputStream>& 
output_stream)
+      : output_stream_(output_stream), length_(0) {}
+
+  uint64_t getLength() const override { return length_; }
+
+  uint64_t getNaturalWriteSize() const override { return 
ORC_NATURAL_WRITE_SIZE; }
+
+  void write(const void* buf, size_t length) override {
+    ORC_THROW_NOT_OK(output_stream_->Write(buf, static_cast<int64_t>(length)));
+    length_ += static_cast<int64_t>(length);
+  }
+
+  const std::string& getName() const override {
+    static const std::string filename("ArrowOutputFile");
+    return filename;
+  }
+
+  void close() override {
+    if (!output_stream_->closed()) {
+      ORC_THROW_NOT_OK(output_stream_->Close());
+    }
+  }
+
+  void set_length(int64_t length) { length_ = length; }
+
+ private:
+  std::shared_ptr<io::OutputStream> output_stream_;
+  int64_t length_;
+};
+
+class ORCFileWriter::Impl {
+ public:
+  Status Open(const std::shared_ptr<Schema>& schema,
+              const std::shared_ptr<io::OutputStream>& output_stream) {
+    orc_options_ = std::unique_ptr<liborc::WriterOptions>(new 
liborc::WriterOptions());
+    outStream_ = ORC_UNIQUE_PTR<liborc::OutputStream>(
+        static_cast<liborc::OutputStream*>(new 
ArrowOutputStream(output_stream)));
+    ORC_THROW_NOT_OK(GetORCType(*schema, &orcSchema_));
+    try {
+      writer_ = createWriter(*orcSchema_, outStream_.get(), *orc_options_);
+    } catch (const liborc::ParseError& e) {
+      return Status::IOError(e.what());
+    }
+    schema_ = schema;
+    num_cols_ = schema->num_fields();
+    return Status::OK();
+  }
+  Status Write(const std::shared_ptr<Table> table) {
+    int64_t numRows = table->num_rows();
+    int64_t batch_size = 1024;  // Doesn't matter what it is
+    std::vector<int64_t> arrowIndexOffset(num_cols_, 0);
+    std::vector<int> arrowChunkOffset(num_cols_, 0);
+    ORC_UNIQUE_PTR<liborc::ColumnVectorBatch> batch = 
writer_->createRowBatch(batch_size);
+    liborc::StructVectorBatch* root =
+        internal::checked_cast<liborc::StructVectorBatch*>(batch.get());
+    std::vector<liborc::ColumnVectorBatch*> fields = root->fields;
+    while (numRows > 0) {
+      for (int i = 0; i < num_cols_; i++) {
+        ORC_THROW_NOT_OK(adapters::orc::FillBatch(
+            schema_->field(i)->type().get(), fields[i], arrowIndexOffset[i],
+            arrowChunkOffset[i], batch_size, table->column(i).get()));

Review comment:
       This is intentional. Multiple ORC batches and multiple Arrow chunks are 
expected.

##########
File path: cpp/src/arrow/adapters/orc/adapter.h
##########
@@ -19,12 +19,16 @@
 
 #include <cstdint>
 #include <memory>
+#include <sstream>
 #include <vector>
 
+#include "arrow/io/file.h"
 #include "arrow/io/interfaces.h"
 #include "arrow/memory_pool.h"
 #include "arrow/record_batch.h"
 #include "arrow/status.h"
+#include "arrow/table.h"
+#include "arrow/table_builder.h"

Review comment:
       Thanks! This has been fixed.

##########
File path: cpp/src/arrow/adapters/orc/adapter.h
##########
@@ -19,12 +19,16 @@
 
 #include <cstdint>
 #include <memory>
+#include <sstream>
 #include <vector>
 
+#include "arrow/io/file.h"

Review comment:
       Thanks! This has been fixed.

##########
File path: cpp/src/arrow/adapters/orc/adapter_util.cc
##########
@@ -316,10 +326,482 @@ Status AppendBatch(const liborc::Type* type, 
liborc::ColumnVectorBatch* batch,
   }
 }
 
+template <class array_type, class batch_type>
+Status FillNumericBatch(const DataType* type, liborc::ColumnVectorBatch* 
cbatch,
+                        int64_t& arrowOffset, int64_t& orcOffset, int64_t 
length,
+                        Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<array_type*>(parray);
+  auto batch = checked_cast<batch_type*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, 
arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && 
!(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      batch->data[orcOffset] = array->Value(arrowOffset);
+      batch->notNull[orcOffset] = true;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+template <class array_type, class batch_type, class target_type>
+Status FillNumericBatchCast(const DataType* type, liborc::ColumnVectorBatch* 
cbatch,
+                            int64_t& arrowOffset, int64_t& orcOffset, int64_t 
length,
+                            Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<array_type*>(parray);
+  auto batch = checked_cast<batch_type*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, 
arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && 
!(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      batch->data[orcOffset] = 
static_cast<target_type>(array->Value(arrowOffset));
+      batch->notNull[orcOffset] = true;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillDate64Batch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                       int64_t& arrowOffset, int64_t& orcOffset, int64_t 
length,
+                       Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<Date64Array*>(parray);
+  auto batch = checked_cast<liborc::TimestampVectorBatch*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, 
arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && 
!(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      int64_t miliseconds = array->Value(arrowOffset);
+      batch->data[orcOffset] =
+          static_cast<int64_t>(std::floor(miliseconds / kOneSecondMillis));
+      batch->nanoseconds[orcOffset] =
+          (miliseconds - kOneSecondMillis * batch->data[orcOffset]) * 
kOneMilliNanos;
+      batch->notNull[orcOffset] = true;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillTimestampBatch(const DataType* type, liborc::ColumnVectorBatch* 
cbatch,
+                          int64_t& arrowOffset, int64_t& orcOffset, int64_t 
length,
+                          Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<TimestampArray*>(parray);
+  auto batch = checked_cast<liborc::TimestampVectorBatch*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, 
arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && 
!(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      int64_t data = array->Value(arrowOffset);
+      batch->notNull[orcOffset] = true;
+      switch (std::static_pointer_cast<TimestampType>(array->type())->unit()) {
+        case TimeUnit::type::SECOND: {
+          batch->data[orcOffset] = data;
+          batch->nanoseconds[orcOffset] = 0;
+          break;
+        }
+        case TimeUnit::type::MILLI: {
+          batch->data[orcOffset] =
+              static_cast<int64_t>(std::floor(data / kOneSecondMillis));
+          batch->nanoseconds[orcOffset] =
+              (data - kOneSecondMillis * batch->data[orcOffset]) * 
kOneMilliNanos;
+          break;
+        }
+        case TimeUnit::type::MICRO: {
+          batch->data[orcOffset] =
+              static_cast<int64_t>(std::floor(data / kOneSecondMicros));
+          batch->nanoseconds[orcOffset] =
+              (data - kOneSecondMicros * batch->data[orcOffset]) * 
kOneMicroNanos;
+          break;
+        }
+        default: {
+          batch->data[orcOffset] =
+              static_cast<int64_t>(std::floor(data / kOneSecondNanos));
+          batch->nanoseconds[orcOffset] = data - kOneSecondNanos * 
batch->data[orcOffset];
+        }
+      }
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+template <class array_type>
+Status FillStringBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                       int64_t& arrowOffset, int64_t& orcOffset, int64_t 
length,
+                       Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<array_type*>(parray);
+  auto batch = checked_cast<liborc::StringVectorBatch*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, 
arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && 
!(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      batch->notNull[orcOffset] = true;
+      std::string dataString = array->GetString(arrowOffset);
+      int dataStringLength = dataString.length();
+      if (batch->data[orcOffset]) delete batch->data[orcOffset];
+      batch->data[orcOffset] = new char[dataStringLength + 1];  // Include null
+      memcpy(batch->data[orcOffset], dataString.c_str(), dataStringLength + 1);
+      batch->length[orcOffset] = dataStringLength;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+template <class array_type, class offset_type>
+Status FillBinaryBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                       int64_t& arrowOffset, int64_t& orcOffset, int64_t 
length,
+                       Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<array_type*>(parray);
+  auto batch = checked_cast<liborc::StringVectorBatch*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, 
arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && 
!(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      batch->notNull[orcOffset] = true;
+      offset_type dataLength = 0;
+      const uint8_t* data = array->GetValue(arrowOffset, &dataLength);
+      if (batch->data[orcOffset]) delete batch->data[orcOffset];
+      batch->data[orcOffset] = new char[dataLength];  // Do not include null
+      memcpy(batch->data[orcOffset], data, dataLength);
+      batch->length[orcOffset] = dataLength;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillFixedSizeBinaryBatch(const DataType* type, 
liborc::ColumnVectorBatch* cbatch,
+                                int64_t& arrowOffset, int64_t& orcOffset, 
int64_t length,
+                                Array* parray, std::vector<bool>* 
incomingMask) {
+  auto array = checked_cast<FixedSizeBinaryArray*>(parray);
+  auto batch = checked_cast<liborc::StringVectorBatch*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  int32_t byteWidth = array->byte_width();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, 
arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && 
!(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      batch->notNull[orcOffset] = true;
+      const uint8_t* data = array->GetValue(arrowOffset);
+      if (batch->data[orcOffset]) delete batch->data[orcOffset];
+      batch->data[orcOffset] = new char[byteWidth];  // Do not include null
+      memcpy(batch->data[orcOffset], data, byteWidth);
+      batch->length[orcOffset] = byteWidth;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+// If Arrow supports 256-bit decimals we can not support it unless ORC does it
+Status FillDecimalBatch(const DataType* type, liborc::ColumnVectorBatch* 
cbatch,
+                        int64_t& arrowOffset, int64_t& orcOffset, int64_t 
length,
+                        Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<Decimal128Array*>(parray);
+  auto batch = checked_cast<liborc::Decimal128VectorBatch*>(cbatch);
+  // Arrow uses 128 bits for decimal type and in the future, 256 bits will 
also be
+  // supported.
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, 
arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && 
!(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      batch->notNull[orcOffset] = true;
+      uint8_t* rawInt128 = const_cast<uint8_t*>(array->GetValue(arrowOffset));
+      uint64_t* lowerBits = reinterpret_cast<uint64_t*>(rawInt128);
+      int64_t* higherBits = reinterpret_cast<int64_t*>(rawInt128 + 8);
+      batch->values[orcOffset] = liborc::Int128(*higherBits, *lowerBits);
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillStructBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                       int64_t& arrowOffset, int64_t& orcOffset, int64_t 
length,
+                       Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<StructArray*>(parray);
+  auto batch = checked_cast<liborc::StructVectorBatch*>(cbatch);
+  std::shared_ptr<std::vector<bool>> outgoingMask;
+  std::size_t size = type->fields().size();
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  int64_t initORCOffset = orcOffset;
+  int64_t initArrowOffset = arrowOffset;
+  // First fill fields of ColumnVectorBatch
+  if (array->null_count() || incomingMask) {
+    batch->hasNulls = true;
+    outgoingMask = std::make_shared<std::vector<bool>>(length, true);
+  } else {
+    outgoingMask = NULLPTR;
+  }
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, 
arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && 
!(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+      (*outgoingMask)[orcOffset] = false;
+    } else {
+      batch->notNull[orcOffset] = true;
+    }
+  }
+  batch->numElements += orcOffset - initORCOffset;
+  // Fill the fields
+  for (std::size_t i = 0; i < size; i++) {
+    orcOffset = initORCOffset;
+    arrowOffset = initArrowOffset;
+    RETURN_NOT_OK(FillBatch(type->field(i)->type().get(), batch->fields[i], 
arrowOffset,
+                            orcOffset, length, array->field(i).get(),
+                            outgoingMask.get()));
+  }
+  return Status::OK();
+}
+
+template <class array_type>
+Status FillListBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                     int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                     Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<array_type*>(parray);
+  auto batch = checked_cast<liborc::ListVectorBatch*>(cbatch);
+  auto elementBatch = (batch->elements).get();
+  DataType* elementType = array->value_type().get();
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (orcOffset == 0) batch->offsets[0] = 0;
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, 
arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && 
!(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+      batch->offsets[orcOffset + 1] = batch->offsets[orcOffset];
+    } else {
+      batch->notNull[orcOffset] = true;
+      batch->offsets[orcOffset + 1] = batch->offsets[orcOffset] +
+                                      array->value_offset(arrowOffset + 1) -
+                                      array->value_offset(arrowOffset);
+      elementBatch->resize(batch->offsets[orcOffset + 1]);
+      int64_t subarrayArrowOffset = array->value_offset(arrowOffset),
+              subarrayORCOffset = batch->offsets[orcOffset],
+              subarrayORCLength = batch->offsets[orcOffset + 1];
+      RETURN_NOT_OK(FillBatch(elementType, elementBatch, subarrayArrowOffset,
+                              subarrayORCOffset, subarrayORCLength, 
array->values().get(),
+                              NULLPTR));
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillFixedSizeListBatch(const DataType* type, liborc::ColumnVectorBatch* 
cbatch,
+                              int64_t& arrowOffset, int64_t& orcOffset, 
int64_t length,
+                              Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<FixedSizeListArray*>(parray);
+  auto batch = checked_cast<liborc::ListVectorBatch*>(cbatch);
+  auto elementBatch = (batch->elements).get();
+  DataType* elementType = array->value_type().get();
+  int64_t arrowLength = array->length();
+  int32_t elementLength = array->value_length();  // Fixed length of each 
subarray
+  if (!arrowLength) return Status::OK();
+  if (orcOffset == 0) batch->offsets[0] = 0;
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, 
arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && 
!(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+      batch->offsets[orcOffset + 1] = batch->offsets[orcOffset];
+    } else {
+      batch->notNull[orcOffset] = true;
+      batch->offsets[orcOffset + 1] = batch->offsets[orcOffset] + 
elementLength;
+      int64_t subarrayArrowOffset = array->value_offset(arrowOffset),
+              subarrayORCOffset = batch->offsets[orcOffset],
+              subarrayORCLength = batch->offsets[orcOffset + 1];
+      elementBatch->resize(subarrayORCLength);
+      RETURN_NOT_OK(FillBatch(elementType, elementBatch, subarrayArrowOffset,
+                              subarrayORCOffset, subarrayORCLength, 
array->values().get(),
+                              NULLPTR));
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillMapBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                    int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                    Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<MapArray*>(parray);
+  auto batch = checked_cast<liborc::MapVectorBatch*>(cbatch);
+  auto keyBatch = (batch->keys).get();
+  auto elementBatch = (batch->elements).get();
+  auto keyArray = array->keys().get();
+  auto elementArray = array->items().get();
+  DataType* keyType = keyArray->type().get();
+  DataType* elementType = elementArray->type().get();
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  // int64_t initORCOffset = orcOffset, initArrowOffset = arrowOffset;
+  if (orcOffset == 0) batch->offsets[0] = 0;
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, 
arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && 
!(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+      batch->offsets[orcOffset + 1] = batch->offsets[orcOffset];
+    } else {
+      batch->notNull[orcOffset] = true;
+      batch->offsets[orcOffset + 1] = batch->offsets[orcOffset] +
+                                      array->value_offset(arrowOffset + 1) -
+                                      array->value_offset(arrowOffset);
+      int64_t subarrayArrowOffset = array->value_offset(arrowOffset),
+              subarrayORCOffset = batch->offsets[orcOffset],
+              subarrayORCLength = batch->offsets[orcOffset + 1],
+              initSubarrayArrowOffset = subarrayArrowOffset,
+              initSubarrayORCOffset = subarrayORCOffset;
+      keyBatch->resize(subarrayORCLength);
+      elementBatch->resize(subarrayORCLength);
+      RETURN_NOT_OK(FillBatch(keyType, keyBatch, subarrayArrowOffset, 
subarrayORCOffset,
+                              subarrayORCLength, keyArray, NULLPTR));
+      subarrayArrowOffset = initSubarrayArrowOffset;
+      subarrayORCOffset = initSubarrayORCOffset;
+      RETURN_NOT_OK(FillBatch(elementType, elementBatch, subarrayArrowOffset,
+                              subarrayORCOffset, subarrayORCLength, 
elementArray,
+                              NULLPTR));
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                 int64_t& arrowOffset, int64_t& orcOffset, int64_t length, 
Array* parray,
+                 std::vector<bool>* incomingMask) {

Review comment:
       Sure! I will!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to