This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new d86f992fc KUDU-1261 introduce serdes utilities for scalar arrays
d86f992fc is described below

commit d86f992fc8d460970f73d065c860d765e86e5b93
Author: Alexey Serbin <[email protected]>
AuthorDate: Thu Sep 25 23:33:49 2025 -0700

    KUDU-1261 introduce serdes utilities for scalar arrays
    
    This changelist introduces serialization utility functions Serialize()
    and SerializeIntoArena() for one-dimensional arrays
    (common/array_type_serdes.h).  Accordingly, for accessing the data
    in the serialized buffer, the ArrayCellMetadataView class is introduced
    (common/array_cell_view.h).
    
    This changelist also adds implementation of ArrayTypeTraits' methods:
      * void AppendDebugStringForValue(const void* val, std::string* str)
      * int Compare(const void* lhs, const void* rhs)
      * bool AreConsecutive(const void* a, const void* b)
    
    Corresponding unit test are included: while the current coverage isn't
    exhaustive, it seems to be good enough as a starting point, and more
    tests can be added later on, if necessary.
    
    Change-Id: I7384580ea9eaa37e6f49dbc9746d9994c3f7ade6
    Reviewed-on: http://gerrit.cloudera.org:8080/23461
    Tested-by: Alexey Serbin <[email protected]>
    Reviewed-by: Abhishek Chennaka <[email protected]>
---
 src/kudu/common/CMakeLists.txt            |   1 +
 src/kudu/common/array_cell_view.h         | 282 ++++++++++++++++++++++++++++
 src/kudu/common/array_type_serdes-test.cc | 102 +++++++++++
 src/kudu/common/array_type_serdes.h       | 284 ++++++++++++++++++++++++++++
 src/kudu/common/types-test.cc             | 295 ++++++++++++++++++++++++++++++
 src/kudu/common/types.h                   | 192 ++++++++++++++++++-
 src/kudu/util/bitmap-test.cc              |  48 +++++
 src/kudu/util/bitmap.cc                   |  16 ++
 src/kudu/util/bitmap.h                    |   3 +
 9 files changed, 1215 insertions(+), 8 deletions(-)

diff --git a/src/kudu/common/CMakeLists.txt b/src/kudu/common/CMakeLists.txt
index 07447ab26..2ad6d4e18 100644
--- a/src/kudu/common/CMakeLists.txt
+++ b/src/kudu/common/CMakeLists.txt
@@ -105,6 +105,7 @@ ADD_EXPORTABLE_LIBRARY(kudu_common
 #######################################
 
 SET_KUDU_TEST_LINK_LIBS(kudu_common)
+ADD_KUDU_TEST(array_type_serdes-test)
 ADD_KUDU_TEST(columnar_serialization-test)
 ADD_KUDU_TEST(columnblock-test)
 ADD_KUDU_TEST(column_predicate-test NUM_SHARDS 4)
diff --git a/src/kudu/common/array_cell_view.h 
b/src/kudu/common/array_cell_view.h
new file mode 100644
index 000000000..06c7ae369
--- /dev/null
+++ b/src/kudu/common/array_cell_view.h
@@ -0,0 +1,282 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include <flatbuffers/vector.h>
+#include <flatbuffers/verifier.h>
+#include <glog/logging.h>
+#include <gtest/gtest_prod.h>
+
+#include "kudu/common/common.pb.h"
+#include "kudu/common/serdes/array1d.fb.h"
+#include "kudu/gutil/port.h"
+#include "kudu/util/bitmap.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+constexpr serdes::ScalarArray KuduToScalarArrayType(DataType data_type) {
+  switch (data_type) {
+    case INT8:
+      return serdes::ScalarArray::Int8Array;
+    case BOOL:
+    case UINT8:
+      return serdes::ScalarArray::UInt8Array;
+    case INT16:
+      return serdes::ScalarArray::Int16Array;
+    case UINT16:
+      return serdes::ScalarArray::UInt16Array;
+    case DATE:
+    case INT32:
+      return serdes::ScalarArray::Int32Array;
+    case UINT32:
+      return serdes::ScalarArray::UInt32Array;
+    case INT64:
+    case UNIXTIME_MICROS:
+      return serdes::ScalarArray::Int64Array;
+    case UINT64:
+      return serdes::ScalarArray::UInt64Array;
+    case FLOAT:
+      return serdes::ScalarArray::FloatArray;
+    case DOUBLE:
+      return serdes::ScalarArray::DoubleArray;
+    case STRING:
+    case VARCHAR:
+      return serdes::ScalarArray::StringArray;
+    case BINARY:
+      return serdes::ScalarArray::BinaryArray;
+    case DECIMAL32:
+      return serdes::ScalarArray::Int32Array;
+    case DECIMAL64:
+      return serdes::ScalarArray::Int64Array;
+    case DECIMAL128:
+    case INT128:
+      LOG(DFATAL) << DataType_Name(data_type)
+                 << ": type isn't yet supported in 1D arrays";
+      return serdes::ScalarArray::NONE;
+    default:
+      LOG(DFATAL) << "unknown type: " << DataType_Name(data_type);
+      return serdes::ScalarArray::NONE;
+  }
+}
+
+class ArrayCellMetadataView final {
+ public:
+  // buf: data raw pointer
+  // len: size of the buffer (bytes) pointed at by the 'buf' pointer
+  ArrayCellMetadataView(const uint8_t* buf, const size_t size)
+       : data_(buf),
+         size_(size),
+         content_(nullptr),
+         is_initialized_(false) {
+  }
+
+  Status Init() {
+    DCHECK(!is_initialized_);
+    if (size_ == 0) {
+      DCHECK(!data_);
+      content_ = nullptr;
+      is_initialized_ = true;
+      return Status::OK();
+    }
+
+    DCHECK_GT(size_, 0);
+    {
+      flatbuffers::Verifier::Options opt;
+      // While verifying the input data, rely on the built-in constant of
+      // FLATBUFFERS_MAX_BUFFER_SIZE.  2GiBytes - 1 bytes seems big enough
+      // to fit a single one-dimensional array.
+      if (PREDICT_FALSE(size_ + 1 > FLATBUFFERS_MAX_BUFFER_SIZE)) {
+        return Status::InvalidArgument("serialized flatbuffers too big");
+      }
+
+      // Keep the verifier's parameters strict:
+      //   * depth of 3 is enough to verify contents of serdes::Binary, the 
type
+      //     from array1d.fbs of the deepest nesting
+      //   * maximum number of tables to verify is set to 65536 + 1 to support
+      //     the maximum possible number of serdes::UInt8Array elements as the
+      //     contents of serdes::BinaryArray table: 65536 is the the limitation
+      //     based on CFile's array data block format, and extra 1 comes from
+      //     the top-level Content table (see serdes/array1d.fbs)
+      //   * max_size is set to the size of the memory buffer plus one extra
+      //     byte due to the strict 'less than' (not 'less than or equal')
+      //     comparison criteria in the flatbuffers' logic that asserts the
+      //     buffers size restriction
+      opt.max_depth = 3;
+      opt.max_tables = 65536 + 1;
+      opt.max_size = size_ + 1;
+
+      flatbuffers::Verifier verifier(data_, size_, opt);
+      if (PREDICT_FALSE(!serdes::VerifyContentBuffer(verifier))) {
+        return Status::Corruption("corrupted flatbuffers data");
+      }
+    }
+
+    content_ = serdes::GetContent(data_);
+    if (PREDICT_FALSE(!content_)) {
+      return Status::IllegalState("null flatbuffers of non-zero size");
+    }
+
+    DCHECK(content_);
+    if (const size_t bit_num = content_->validity()->size(); bit_num != 0) {
+      bitmap_.reset(new uint8_t[BitmapSize(bit_num)]);
+      auto* bm = bitmap_.get();
+      const auto* v = content_->validity();
+      for (size_t idx = 0; idx < bit_num; ++idx) {
+        if (v->Get(idx) != 0) {
+          BitmapSet(bm, idx);
+        } else {
+          BitmapClear(bm, idx);
+        }
+      }
+    }
+    const auto data_type = content_->data_type();
+    if (data_type != serdes::ScalarArray::BinaryArray &&
+        data_type != serdes::ScalarArray::StringArray) {
+      // For non-binary types, there is nothing else to do.
+      is_initialized_ = true;
+      return Status::OK();
+    }
+
+    // Build the metadata on the spans of binary/string elements
+    // in the buffer.
+    if (data_type == serdes::ScalarArray::StringArray) {
+      const auto* values = content_->data_as<serdes::StringArray>()->values();
+      binary_data_spans_.reserve(values->size());
+      for (auto cit = values->cbegin(); cit != values->end(); ++cit) {
+        const auto* str = *cit;
+        DCHECK(str);
+        binary_data_spans_.emplace_back(str->c_str(), str->size());
+      }
+    } else {
+      DCHECK(serdes::ScalarArray::BinaryArray == data_type);
+      const auto* values = content_->data_as<serdes::BinaryArray>()->values();
+      binary_data_spans_.reserve(values->size());
+      for (auto cit = values->cbegin(); cit != values->end(); ++cit) {
+        const auto* byte_seq = cit->values();
+        DCHECK(byte_seq);
+        binary_data_spans_.emplace_back(byte_seq->Data(), byte_seq->size());
+      }
+    }
+    is_initialized_ = true;
+    return Status::OK();
+  }
+
+  ~ArrayCellMetadataView() = default;
+
+  // Number of elements in the array.
+  size_t elem_num() const {
+    DCHECK(is_initialized_);
+    return content_ ? content_->validity()->size() : 0;
+  }
+
+  bool empty() const {
+    DCHECK(is_initialized_);
+    return content_ ? content_->validity()->empty() : true;
+  }
+
+  // Non-null (a.k.a. validity) bitmap for the array elements.
+  const uint8_t* not_null_bitmap() const {
+    DCHECK(is_initialized_);
+    return bitmap_.get();
+  }
+
+  const uint8_t* data_as(DataType data_type) const {
+    DCHECK(is_initialized_);
+    if (empty()) {
+      return nullptr;
+    }
+    if (PREDICT_FALSE(content_->data_type() != 
KuduToScalarArrayType(data_type))) {
+      return nullptr;
+    }
+    switch (data_type) {
+      case DataType::BOOL:
+        return data<serdes::UInt8Array>();
+      case DataType::INT8:
+        return data<serdes::Int8Array>();
+      case DataType::UINT8:
+        return data<serdes::UInt8Array>();
+      case DataType::INT16:
+        return data<serdes::Int16Array>();
+      case DataType::UINT16:
+        return data<serdes::UInt16Array>();
+      case DataType::DATE:
+      case DataType::DECIMAL32:
+      case DataType::INT32:
+        return data<serdes::Int32Array>();
+      case DataType::UINT32:
+        return data<serdes::UInt32Array>();
+      case DataType::DECIMAL64:
+      case DataType::INT64:
+      case DataType::UNIXTIME_MICROS:
+        return data<serdes::Int64Array>();
+      case DataType::UINT64:
+        return data<serdes::UInt64Array>();
+      case DataType::FLOAT:
+        return data<serdes::FloatArray>();
+      case DataType::DOUBLE:
+        return data<serdes::DoubleArray>();
+      case DataType::BINARY:
+      case DataType::STRING:
+      case DataType::VARCHAR:
+        // For binary/non-integer types, Kudu expects Slice elements.
+        return reinterpret_cast<const uint8_t*>(binary_data_spans_.data());
+      default:
+        DCHECK(false) << "unsupported type: " << DataType_Name(data_type);
+        return nullptr;
+    }
+  }
+
+ private:
+  FRIEND_TEST(ArrayTypeSerdesTest, Basic);
+
+  template<typename T>
+  const uint8_t* data() const {
+    DCHECK(is_initialized_);
+    return content_ ? content_->data_as<T>()->values()->Data() : nullptr;
+  }
+
+  // Flatbuffer-encoded data; a non-owning raw pointer.
+  const uint8_t* data_;
+
+  // Size of the encoded data, i.e. the number of bytes in the memory after
+  // the 'data_' pointer that represent the serialized array.
+  const size_t size_;
+
+  // A non-owning raw pointer to the flatbuffers serialized buffer. It's 
nullptr
+  // for an empty (size_ == 0) buffer.
+  const serdes::Content* content_;
+
+  // A bitmap built of the boolean validity vector.
+  // TODO(aserbin): switch array1d to bitfield instead of bool vector for 
validity?
+  std::unique_ptr<uint8_t[]> bitmap_;
+
+  // Spans of binary data in the serialized buffer. This is populated only
+  // for string/binary types.
+  std::vector<Slice> binary_data_spans_;
+
+  // Whether the Init() method has been successfully run for this object.
+  bool is_initialized_;
+};
+
+} // namespace kudu
diff --git a/src/kudu/common/array_type_serdes-test.cc 
b/src/kudu/common/array_type_serdes-test.cc
new file mode 100644
index 000000000..be180b9a6
--- /dev/null
+++ b/src/kudu/common/array_type_serdes-test.cc
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/common/array_type_serdes.h"
+
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/common/array_cell_view.h"
+#include "kudu/common/common.pb.h"
+#include "kudu/common/types.h"
+#include "kudu/util/bitmap.h"
+#include "kudu/util/memory/arena.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/test_macros.h"
+
+using std::unique_ptr;
+using std::vector;
+
+namespace kudu {
+
+namespace serdes {
+struct Int32Array;
+} // namespace serdes
+
+TEST(ArrayTypeSerdesTest, Basic) {
+  const vector<int32_t> val{ 0, 1, 12, 5, 26, 42, };
+  const uint8_t validity_bitmap[] = { 0b00111010 };
+  const vector<bool> validity_vector(BitmapToVector(validity_bitmap, 
val.size()));
+  ASSERT_EQ(val.size(), validity_vector.size());
+
+  unique_ptr<uint8_t[]> buf_data;
+  size_t buf_data_size = 0;
+  ASSERT_OK(Serialize(GetTypeInfo(INT32),
+                      reinterpret_cast<const uint8_t*>(val.data()),
+                      val.size(),
+                      validity_vector,
+                      &buf_data,
+                      &buf_data_size));
+  ASSERT_TRUE(buf_data);
+  const Slice cell(buf_data.get(), buf_data_size);
+
+  Arena arena(128);
+  Slice arena_cell;
+  ASSERT_OK(SerializeIntoArena(
+      GetTypeInfo(INT32),
+      reinterpret_cast<const uint8_t*>(val.data()),
+      validity_bitmap,
+      val.size(),
+      &arena,
+      &arena_cell));
+
+  // Make sure Serialize() an SerializeInfoArena() produce the same data.
+  ASSERT_EQ(cell, arena_cell);
+
+  // Peek into the serialized buffer using ArrayCellMetadataView and compare
+  // the source data with the view into the serialized buffer.
+  ArrayCellMetadataView view(cell.data(), cell.size());
+  ASSERT_OK(view.Init());
+  ASSERT_EQ(val.size(), view.elem_num());
+  const auto* view_validity_bitmap = view.not_null_bitmap();
+  ASSERT_TRUE(BitmapEquals(validity_bitmap, view_validity_bitmap, 
view.elem_num()));
+
+  // Verify the data matches the source.
+  {
+    const uint8_t* data_view = view.data_as(INT32);
+    ASSERT_NE(nullptr, data_view);
+    ASSERT_EQ(0, memcmp(data_view, val.data(), sizeof(int32_t) * val.size()));
+  }
+  {
+    const uint8_t* data_view = view.data<serdes::Int32Array>();
+    ASSERT_NE(nullptr, data_view);
+    ASSERT_EQ(0, memcmp(data_view, val.data(), sizeof(int32_t) * val.size()));
+  }
+
+  // Try peeking at the data as of wrong type: it should return nullptr.
+  {
+    ASSERT_EQ(nullptr, view.data_as(UINT32));
+    ASSERT_EQ(nullptr, view.data_as(INT64));
+  }
+}
+
+} // namespace kudu
diff --git a/src/kudu/common/array_type_serdes.h 
b/src/kudu/common/array_type_serdes.h
new file mode 100644
index 000000000..a6b5a8b8f
--- /dev/null
+++ b/src/kudu/common/array_type_serdes.h
@@ -0,0 +1,284 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <vector>
+
+#include <flatbuffers/base.h>
+#include <flatbuffers/buffer.h>
+#include <flatbuffers/flatbuffer_builder.h>
+#include <flatbuffers/stl_emulation.h>
+
+#include "kudu/common/serdes/array1d.fb.h"
+#include "kudu/common/types.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/bitmap.h"
+#include "kudu/util/memory/arena.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+template<DataType KUDU_DATA_TYPE, typename FB_TYPE>
+void BuildFlatbuffers(
+    const uint8_t* column_data,
+    size_t nrows,
+    const std::vector<bool>& validity,
+    flatbuffers::FlatBufferBuilder* bld) {
+  typedef typename DataTypeTraits<KUDU_DATA_TYPE>::cpp_type ElementType;
+
+  DCHECK(bld);
+  auto& builder = *bld;
+
+  std::vector<ElementType> val;
+  val.resize(nrows);
+  if (column_data != nullptr) {
+    DCHECK_NE(0, nrows);
+    if constexpr (std::is_same<Slice, ElementType>::value) {
+      static const Slice kEmptySlice(static_cast<uint8_t*>(nullptr), 0);
+      const Slice* ptr = reinterpret_cast<const Slice*>(column_data);
+      for (size_t idx = 0; idx < nrows; ++idx) {
+        val[idx] = validity[idx] ? *(ptr + idx) : kEmptySlice;
+      }
+    } else {
+      static_assert(!std::is_same<Slice, ElementType>::value,
+                    "cannot be a binary type");
+      memcpy(val.data(), column_data, nrows * sizeof(ElementType));
+    }
+  }
+
+  auto validity_vector = builder.CreateVector(validity);
+  if constexpr (KUDU_DATA_TYPE == STRING) {
+    auto values = FB_TYPE::Traits::Create(
+        builder, builder.CreateVectorOfStrings<ElementType>(val));
+    builder.Finish(CreateContent(builder,
+                                 KuduToScalarArrayType(KUDU_DATA_TYPE),
+                                 values.Union(),
+                                 validity_vector));
+  } else if constexpr (KUDU_DATA_TYPE == BINARY) {
+    std::vector<flatbuffers::Offset<serdes::UInt8Array>> offsets;
+    offsets.reserve(val.size());
+    for (const auto& e : val) {
+      auto ev = builder.CreateVector(
+          reinterpret_cast<const uint8_t*>(e.data()), e.size());
+      offsets.emplace_back(serdes::CreateUInt8Array(builder, ev));
+    }
+
+    auto values = CreateBinaryArrayDirect(builder, &offsets);
+    builder.Finish(CreateContent(builder,
+                                 KuduToScalarArrayType(KUDU_DATA_TYPE),
+                                 values.Union(),
+                                 validity_vector));
+  } else {
+    auto values = FB_TYPE::Traits::Create(
+        builder, builder.CreateVector<ElementType>(val));
+    builder.Finish(CreateContent(builder,
+                                 KuduToScalarArrayType(KUDU_DATA_TYPE),
+                                 values.Union(),
+                                 validity_vector));
+  }
+}
+
+template<DataType KUDU_DATA_TYPE, typename FB_TYPE>
+Status Serialize(
+    const uint8_t* column_data,
+    size_t nrows,
+    const std::vector<bool>& validity,
+    std::unique_ptr<uint8_t[]>* out_buf,
+    size_t* out_buf_size) {
+  typedef typename DataTypeTraits<KUDU_DATA_TYPE>::cpp_type ElementType;
+
+  DCHECK(out_buf);
+  DCHECK(out_buf_size);
+  DCHECK_EQ(validity.size(), nrows);
+
+  if (PREDICT_FALSE(column_data == nullptr && nrows > 0)) {
+    return Status::InvalidArgument("inconsistent data and validity info for 
array");
+  }
+
+  flatbuffers::FlatBufferBuilder builder(
+      nrows * sizeof(ElementType) + nrows + FLATBUFFERS_MIN_BUFFER_SIZE);
+  BuildFlatbuffers<KUDU_DATA_TYPE, FB_TYPE>(column_data, nrows, validity, 
&builder);
+  DCHECK(builder.GetBufferPointer());
+
+  // TODO(aserbin): would it be better to copy the data from the builder
+  //                instead of using ReleaseRaw?
+  size_t buf_size = 0;
+  size_t buf_offset = 0;
+  std::unique_ptr<uint8_t[]> buf(builder.ReleaseRaw(buf_size, buf_offset));
+  DCHECK_LT(buf_offset, buf_size);
+  DCHECK(buf);
+  // TODO(aserbin): introduce offset and use it instead of calling memmove()?
+  memmove(buf.get(), buf.get() + buf_offset, buf_size - buf_offset);
+
+  *out_buf = std::move(buf);
+  *out_buf_size = buf_size - buf_offset;
+
+  return Status::OK();
+}
+
+template<DataType KUDU_DATA_TYPE, typename FB_TYPE>
+Status SerializeIntoArena(
+    const uint8_t* column_data,
+    const uint8_t* validity_bitmap,
+    size_t nrows,
+    Arena* arena,
+    Slice* out) {
+  typedef typename DataTypeTraits<KUDU_DATA_TYPE>::cpp_type ElementType;
+
+  DCHECK(arena);
+  DCHECK(out);
+
+  flatbuffers::FlatBufferBuilder builder(
+      nrows * sizeof(ElementType) + nrows + FLATBUFFERS_MIN_BUFFER_SIZE);
+  const std::vector<bool>& validity = BitmapToVector(validity_bitmap, nrows);
+  BuildFlatbuffers<KUDU_DATA_TYPE, FB_TYPE>(column_data, nrows, validity, 
&builder);
+
+  // Copy the serialized data into the arena.
+  //
+  // TODO(aserbin): is it possible to avoid copying and serialize directly into
+  //                the provided Arena by using the arena's allocator as the
+  //                custom allocator for FlatbufferBuilder, and then maybe use
+  //                DetachedBuffer?
+  const auto* buf = builder.GetBufferPointer();
+  DCHECK(buf);
+  const auto data_size = builder.GetSize();
+
+  uint8_t* data = reinterpret_cast<uint8_t*>(arena->AllocateBytes(data_size));
+  if (PREDICT_FALSE(!data)) {
+    return Status::RuntimeError("out of memory serialzing array column data");
+  }
+  memcpy(data, buf, data_size);
+  *out = Slice(data, data_size);
+  return Status::OK();
+}
+
+inline Status Serialize(
+    const TypeInfo* elem_typeinfo,
+    const uint8_t* column_data,
+    size_t nrows,
+    const std::vector<bool>& validity,
+    std::unique_ptr<uint8_t[]>* out_buf,
+    size_t* out_buf_size) {
+  switch (elem_typeinfo->type()) {
+    case INT8:
+      return Serialize<INT8, serdes::Int8Array>(
+          column_data, nrows, validity, out_buf, out_buf_size);
+    case BOOL:
+    case UINT8:
+      return Serialize<UINT8, serdes::UInt8Array>(
+          column_data, nrows, validity, out_buf, out_buf_size);
+    case INT16:
+      return Serialize<INT16, serdes::Int16Array>(
+          column_data, nrows, validity, out_buf, out_buf_size);
+    case UINT16:
+      return Serialize<UINT16, serdes::UInt16Array>(
+          column_data, nrows, validity, out_buf, out_buf_size);
+    case DATE:
+    case DECIMAL32:
+    case INT32:
+      return Serialize<INT32, serdes::Int32Array>(
+          column_data, nrows, validity, out_buf, out_buf_size);
+    case UINT32:
+      return Serialize<UINT32, serdes::UInt32Array>(
+          column_data, nrows, validity, out_buf, out_buf_size);
+    case DECIMAL64:
+    case INT64:
+    case UNIXTIME_MICROS:
+      return Serialize<INT64, serdes::Int64Array>(
+          column_data, nrows, validity, out_buf, out_buf_size);
+    case UINT64:
+      return Serialize<UINT64, serdes::UInt64Array>(
+          column_data, nrows, validity, out_buf, out_buf_size);
+    case FLOAT:
+      return Serialize<FLOAT, serdes::FloatArray>(
+          column_data, nrows, validity, out_buf, out_buf_size);
+    case DOUBLE:
+      return Serialize<DOUBLE, serdes::DoubleArray>(
+          column_data, nrows, validity, out_buf, out_buf_size);
+    case BINARY:
+      return Serialize<BINARY, serdes::BinaryArray>(
+          column_data, nrows, validity, out_buf, out_buf_size);
+    case STRING:
+    case VARCHAR:
+      return Serialize<STRING, serdes::StringArray>(
+          column_data, nrows, validity, out_buf, out_buf_size);
+    default:
+      return Status::NotSupported("unsupported array element type",
+                                  DataType_Name(elem_typeinfo->type()));
+  }
+}
+
+inline Status SerializeIntoArena(
+    const TypeInfo* elem_typeinfo,
+    const uint8_t* column_data,
+    const uint8_t* validity_bitmap,
+    size_t nrows,
+    Arena* arena,
+    Slice* out) {
+  switch (elem_typeinfo->type()) {
+    case INT8:
+      return SerializeIntoArena<INT8, serdes::Int8Array>(
+          column_data, validity_bitmap, nrows, arena, out);
+    case BOOL:
+    case UINT8:
+      return SerializeIntoArena<UINT8, serdes::UInt8Array>(
+          column_data, validity_bitmap, nrows, arena, out);
+    case INT16:
+      return SerializeIntoArena<INT16, serdes::Int16Array>(
+          column_data, validity_bitmap, nrows, arena, out);
+    case UINT16:
+      return SerializeIntoArena<UINT16, serdes::UInt16Array>(
+          column_data, validity_bitmap, nrows, arena, out);
+    case DATE:
+    case DECIMAL32:
+    case INT32:
+      return SerializeIntoArena<INT32, serdes::Int32Array>(
+          column_data, validity_bitmap, nrows, arena, out);
+    case UINT32:
+      return SerializeIntoArena<UINT32, serdes::UInt32Array>(
+          column_data, validity_bitmap, nrows, arena, out);
+    case DECIMAL64:
+    case INT64:
+    case UNIXTIME_MICROS:
+      return SerializeIntoArena<INT64, serdes::Int64Array>(
+          column_data, validity_bitmap, nrows, arena, out);
+    case UINT64:
+      return SerializeIntoArena<UINT64, serdes::UInt64Array>(
+          column_data, validity_bitmap, nrows, arena, out);
+    case FLOAT:
+      return SerializeIntoArena<FLOAT, serdes::FloatArray>(
+          column_data, validity_bitmap, nrows, arena, out);
+    case DOUBLE:
+      return SerializeIntoArena<DOUBLE, serdes::DoubleArray>(
+          column_data, validity_bitmap, nrows, arena, out);
+    case BINARY:
+      return SerializeIntoArena<BINARY, serdes::BinaryArray>(
+          column_data, validity_bitmap, nrows, arena, out);
+    case STRING:
+    case VARCHAR:
+      return SerializeIntoArena<STRING, serdes::StringArray>(
+          column_data, validity_bitmap, nrows, arena, out);
+    default:
+      return Status::NotSupported("unsupported array element type",
+                                  DataType_Name(elem_typeinfo->type()));
+  }
+}
+
+} // namespace kudu
diff --git a/src/kudu/common/types-test.cc b/src/kudu/common/types-test.cc
index 4ab2ba6d1..0edff6680 100644
--- a/src/kudu/common/types-test.cc
+++ b/src/kudu/common/types-test.cc
@@ -18,10 +18,13 @@
 #include "kudu/common/types.h"
 
 #include <cmath>
+#include <cstddef>
 #include <cstdint>
 #include <initializer_list>
 #include <limits>
+#include <memory>
 #include <string>
+#include <string_view>
 #include <tuple>  // IWYU pragma: keep
 #include <utility>
 #include <variant>
@@ -30,17 +33,21 @@
 #include <gflags/gflags.h>
 #include <gtest/gtest.h>
 
+#include "kudu/common/array_type_serdes.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/gutil/mathlimits.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/slice.h"
+#include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
 using std::get;
 using std::make_tuple;
 using std::nextafter;
 using std::string;
+using std::string_view;
 using std::tuple;
+using std::unique_ptr;
 using std::vector;
 
 namespace kudu {
@@ -242,4 +249,292 @@ TEST_F(TestTypes, ArrayTypeNoRegistryEntries) {
   }
 }
 
+// Verifying how ArrayTypeTraits<T>::AppendDebugStringForValue(...) works
+// for a few data types.
+TEST_F(TestTypes, ArrayTypeDebugString) {
+  {
+    const Slice cell;
+    string out;
+    ArrayTypeTraits<INT8>::AppendDebugStringForValue(&cell, &out);
+    ASSERT_EQ("NULL", out);
+  }
+  {
+    const Slice cell(static_cast<const uint8_t*>(nullptr), 0);
+    string out;
+    ArrayTypeTraits<INT16>::AppendDebugStringForValue(&cell, &out);
+    ASSERT_EQ("NULL", out);
+  }
+  {
+    const vector<int64_t> val;
+    const vector<bool> validity;
+    ASSERT_EQ(val.size(), validity.size());
+
+    unique_ptr<uint8_t[]> buf_data;
+    size_t buf_data_size = 0;
+    ASSERT_OK(Serialize(GetTypeInfo(INT64),
+                        reinterpret_cast<const uint8_t*>(val.data()),
+                        val.size(),
+                        validity,
+                        &buf_data,
+                        &buf_data_size));
+    ASSERT_TRUE(buf_data);
+
+    const Slice cell(buf_data.get(), buf_data_size);
+    string out;
+    ArrayTypeTraits<INT64>::AppendDebugStringForValue(&cell, &out);
+    ASSERT_EQ("[]", out);
+  }
+  {
+    const vector<int32_t> val{ 0, 1, 12, 5, 26, 42, };
+    const vector<bool> validity{ false, true, false, true, true, true, };
+    ASSERT_EQ(val.size(), validity.size());
+
+    unique_ptr<uint8_t[]> buf_data;
+    size_t buf_data_size = 0;
+    ASSERT_OK(Serialize(GetTypeInfo(INT32),
+                        reinterpret_cast<const uint8_t*>(val.data()),
+                        val.size(),
+                        validity,
+                        &buf_data,
+                        &buf_data_size));
+    ASSERT_TRUE(buf_data);
+
+    const Slice cell(buf_data.get(), buf_data_size);
+    string out;
+    ArrayTypeTraits<INT32>::AppendDebugStringForValue(&cell, &out);
+    ASSERT_EQ("[NULL, 1, NULL, 5, 26, 42]", out);
+  }
+  {
+    const vector<Slice> val{ "alphabet", "", "ABC", "mega", "", "turbo", };
+    const vector<bool> validity{ true, false, true, true, true, true, };
+    ASSERT_EQ(val.size(), validity.size());
+
+    unique_ptr<uint8_t[]> buf_data;
+    size_t buf_data_size = 0;
+    ASSERT_OK(Serialize(GetTypeInfo(STRING),
+                        reinterpret_cast<const uint8_t*>(val.data()),
+                        val.size(),
+                        validity,
+                        &buf_data,
+                        &buf_data_size));
+    ASSERT_TRUE(buf_data);
+
+    const Slice cell(buf_data.get(), buf_data_size);
+    string out;
+    ArrayTypeTraits<STRING>::AppendDebugStringForValue(&cell, &out);
+    ASSERT_EQ("[\"alphabet\", NULL, \"ABC\", \"mega\", \"\", \"turbo\"]", out);
+  }
+}
+
+template <typename T, DataType KUDU_TYPE>
+static void CompareArrays(
+    int expected,
+    string_view tag,
+    const vector<T>& lhs_val,
+    const vector<bool>& lhs_validity,
+    const vector<T>& rhs_val,
+    const vector<bool>& rhs_validity) {
+
+  SCOPED_TRACE(tag);
+  ASSERT_EQ(lhs_val.size(), lhs_validity.size());
+  unique_ptr<uint8_t[]> lhs_buf_data;
+  size_t lhs_buf_data_size = 0;
+  ASSERT_OK(Serialize(GetTypeInfo(KUDU_TYPE),
+                      reinterpret_cast<const uint8_t*>(lhs_val.data()),
+                      lhs_val.size(),
+                      lhs_validity,
+                      &lhs_buf_data,
+                      &lhs_buf_data_size));
+  ASSERT_TRUE(lhs_buf_data);
+  const Slice lhs_cell(lhs_buf_data.get(), lhs_buf_data_size);
+
+  ASSERT_EQ(rhs_val.size(), rhs_validity.size());
+  unique_ptr<uint8_t[]> rhs_buf_data;
+  size_t rhs_buf_data_size = 0;
+  ASSERT_OK(Serialize(GetTypeInfo(KUDU_TYPE),
+                      reinterpret_cast<const uint8_t*>(rhs_val.data()),
+                      rhs_val.size(),
+                      rhs_validity,
+                      &rhs_buf_data,
+                      &rhs_buf_data_size));
+  ASSERT_TRUE(rhs_buf_data);
+  const Slice rhs_cell(rhs_buf_data.get(), rhs_buf_data_size);
+
+  ASSERT_EQ(expected, ArrayTypeTraits<KUDU_TYPE>::Compare(&lhs_cell, 
&rhs_cell));
+}
+
+TEST_F(TestTypes, CompareNullArrays) {
+  Slice lhs_cell;;
+  Slice rhs_cell;;
+  ASSERT_EQ(-1, ArrayTypeTraits<INT32>::Compare(&lhs_cell, &rhs_cell));
+  ASSERT_EQ(-1, ArrayTypeTraits<INT32>::Compare(&rhs_cell, &lhs_cell));
+
+  ASSERT_EQ(-1, ArrayTypeTraits<STRING>::Compare(&lhs_cell, &rhs_cell));
+  ASSERT_EQ(-1, ArrayTypeTraits<STRING>::Compare(&rhs_cell, &lhs_cell));
+}
+
+TEST_F(TestTypes, CompareArrays) {
+  NO_FATALS((CompareArrays<Slice, BINARY>(0,
+      "[] = []",
+      {}, {},
+      {}, {})));
+  NO_FATALS((CompareArrays<Slice, STRING>(-1,
+      "[] < [NULL]",
+      {}, {},
+      { "" }, { false })));
+  NO_FATALS((CompareArrays<Slice, BINARY>(1,
+      "[NULL] > []",
+      { "" }, { false },
+      {}, {})));
+  NO_FATALS((CompareArrays<Slice, STRING>(-1,
+      "[NULL, '1'] < ['', '1']",
+      { "0", "1", }, { false, true, },
+      { "", "1", }, { true, true, })));
+  NO_FATALS((CompareArrays<int16_t, INT16>(-1,
+      "[] < [NULL]",
+      {}, {},
+      { 0 }, { false })));
+  NO_FATALS((CompareArrays<int32_t, DATE>(1,
+      "[NULL] > []",
+      { 0 }, { false },
+      {}, {})));
+  NO_FATALS((CompareArrays<int16_t, INT16>(-1,
+      "[] < [-5]",
+      {}, {},
+      { -5 }, { true })));
+  NO_FATALS((CompareArrays<int64_t, UNIXTIME_MICROS>(-1,
+      "[] < [3]",
+      {}, {},
+      { 3 }, { true })));
+  NO_FATALS((CompareArrays<int8_t, INT8>(0,
+      "[NULL] = [NULL]",
+      { 0 }, { false },
+      { 0 }, { false })));
+  NO_FATALS((CompareArrays<int8_t, INT8>(1,
+      "[1] > [NULL]",
+      { 1 }, { true },
+      { 2 }, { false })));
+  NO_FATALS((CompareArrays<int32_t, INT32>(-1,
+      "[-1] < [1]",
+      { -1 }, { true },
+      { 1 }, { true })));
+  NO_FATALS((CompareArrays<int64_t, INT64>(-1,
+      "[NULL] < [-1]",
+      { 1 }, { false },
+      { -1 }, { true })));
+  NO_FATALS((CompareArrays<Slice, STRING>(0,
+      "[NULL, '1', NULL, '42'] = [NULL, '1', NULL, '42']",
+      { "0", "1", "12", "42", }, { false, true, false, true, },
+      { "", "1", "1", "42", }, { false, true, false, true, })));
+  NO_FATALS((CompareArrays<int16_t, INT16>(0,
+      "[NULL, 1, NULL, 5, 26, 42] = [NULL, 1, NULL, 5, 26, 42]",
+      { 0, 1, 12, 5, 26, 42, }, { false, true, false, true, true, true, },
+      { 1, 1, 1, 5, 26, 42, }, { false, true, false, true, true, true, })));
+  NO_FATALS((CompareArrays<int32_t, INT32>(-1,
+      "[NULL, 1, NULL, 5, 26, 42] < [NULL, 1, NULL, 5, 26, 43]",
+      { 0, 1, 0, 5, 26, 42, }, { false, true, false, true, true, true, },
+      { 0, 1, 0, 5, 26, 43, }, { false, true, false, true, true, true, })));
+  NO_FATALS((CompareArrays<int64_t, INT64>(-1,
+      "[NULL, 1, NULL, 5, 25, 42] < [NULL, 1, NULL, 5, 26, 42]",
+      { 0, 1, 0, 5, 25, 42, }, { false, true, false, true, true, true, },
+      { 0, 1, 0, 5, 26, 42, }, { false, true, false, true, true, true, })));
+  NO_FATALS((CompareArrays<Slice, STRING>(-1,
+      "[NULL, '1', NULL] < [NULL, '1', '']",
+      { "0", "1", "12", }, { false, true, false, },
+      { "", "1", "", }, { false, true, true, })));
+  NO_FATALS((CompareArrays<Slice, VARCHAR>(-1,
+      "[NULL, 1] < [NULL, 1, NULL]",
+      { "0", "1", }, { false, true, },
+      { "", "1", "", }, { false, true, false, })));
+  NO_FATALS((CompareArrays<int32_t, INT32>(1,
+      "[NULL, 1, NULL] > [NULL, 0, 2]",
+      { 0, 1, 2 }, { false, true, false },
+      { 0, 0, 2, }, { false, true, true, })));
+  NO_FATALS((CompareArrays<int32_t, INT32>(-1,
+      "[-1, 1, 0] < [1, -2, 1]",
+      { -1, 1, 0, }, { true, true, true, },
+      { 1, -2, 1, }, { true, true, true, })));
+  NO_FATALS((CompareArrays<int16_t, INT16>(-1,
+      "[0, 1, 2, NULL] < [0, 1, 3]",
+      { 0, 1, 2, 0, }, { true, true, true, false, },
+      { 0, 1, 3, }, { true, true, true, })));
+  NO_FATALS((CompareArrays<int64_t, INT64>(-1,
+      "[0, 1, 2, NULL] < [0, 1, 3, NULL]",
+      { 0, 1, 2, 0, }, { true, true, true, false, },
+      { 0, 1, 3, 0, }, { true, true, true, false, })));
+}
+
+template <typename T, DataType KUDU_TYPE>
+static void AreArraysConsecutive(
+    bool expected,
+    string_view tag,
+    const vector<T>& lhs_val,
+    const vector<bool>& lhs_validity,
+    const vector<T>& rhs_val,
+    const vector<bool>& rhs_validity) {
+
+  SCOPED_TRACE(tag);
+  ASSERT_EQ(lhs_val.size(), lhs_validity.size());
+  unique_ptr<uint8_t[]> lhs_buf_data;
+  size_t lhs_buf_data_size = 0;
+  ASSERT_OK(Serialize(GetTypeInfo(KUDU_TYPE),
+                      reinterpret_cast<const uint8_t*>(lhs_val.data()),
+                      lhs_val.size(),
+                      lhs_validity,
+                      &lhs_buf_data,
+                      &lhs_buf_data_size));
+  ASSERT_TRUE(lhs_buf_data);
+  const Slice lhs_cell(lhs_buf_data.get(), lhs_buf_data_size);
+
+  ASSERT_EQ(rhs_val.size(), rhs_validity.size());
+  unique_ptr<uint8_t[]> rhs_buf_data;
+  size_t rhs_buf_data_size = 0;
+  ASSERT_OK(Serialize(GetTypeInfo(KUDU_TYPE),
+                      reinterpret_cast<const uint8_t*>(rhs_val.data()),
+                      rhs_val.size(),
+                      rhs_validity,
+                      &rhs_buf_data,
+                      &rhs_buf_data_size));
+  ASSERT_TRUE(rhs_buf_data);
+  const Slice rhs_cell(rhs_buf_data.get(), rhs_buf_data_size);
+
+  ASSERT_EQ(expected,
+            ArrayTypeTraits<KUDU_TYPE>::AreConsecutive(&lhs_cell, &rhs_cell));
+}
+
+TEST_F(TestTypes, ConsecutiveArrays) {
+  NO_FATALS((AreArraysConsecutive<int8_t, INT8>(false,
+      "[-1, 1] and [-1, 1]",
+      { -1, 1, }, { true, true, },
+      { -1, 1, }, { true, true, })));
+  NO_FATALS((AreArraysConsecutive<int16_t, INT16>(false,
+      "[-1, 1] and [-1, 1, 0]",
+      { -1, 1, }, { true, true, },
+      { -1, 1, 0, }, { true, true, true, })));
+  NO_FATALS((AreArraysConsecutive<int32_t, INT32>(true,
+      "[] and [NULL]",
+      {}, {},
+      { 0, }, { false, })));
+  NO_FATALS((AreArraysConsecutive<int64_t, INT64>(false,
+      "[NULL] and []",
+      { 0, }, { false, },
+      {}, {})));
+  NO_FATALS((AreArraysConsecutive<int8_t, INT8>(true,
+      "[1] and [1, NULL]",
+      { 1, }, { true, },
+      { 1, 0, }, { true, false, })));
+  NO_FATALS((AreArraysConsecutive<int32_t, INT32>(false,
+      "[1, NULL] and [1]",
+      { 1, 0, }, { true, false, },
+      { 1, }, { true, })));
+  NO_FATALS((AreArraysConsecutive<int16_t, INT16>(false,
+      "[-1, 1, NULL] and [-1, 1, 0]",
+      { -1, 1, 0, }, { true, true, false, },
+      { -1, 1, 0, }, { true, true, true, })));
+  NO_FATALS((AreArraysConsecutive<int16_t, INT16>(false,
+      "[-1, 1, 0] and [-1, 1, NULL]",
+      { -1, 1, 0, }, { true, true, true, },
+      { -1, 1, 0, }, { true, true, false, })));
+}
+
 } // namespace kudu
diff --git a/src/kudu/common/types.h b/src/kudu/common/types.h
index e0e7cd0ac..7790867f5 100644
--- a/src/kudu/common/types.h
+++ b/src/kudu/common/types.h
@@ -30,6 +30,7 @@
 
 #include <glog/logging.h>
 
+#include "kudu/common/array_cell_view.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/mathlimits.h"
@@ -916,24 +917,125 @@ struct TypeTraits : public DataTypeTraits<datatype> {
 template<DataType ARRAY_ELEMENT_TYPE>
 struct ArrayTypeTraits : public ArrayDataTypeTraits<ARRAY_ELEMENT_TYPE> {
   typedef Slice cpp_type;
+  typedef typename TypeTraits<ARRAY_ELEMENT_TYPE>::cpp_type element_cpp_type;
 
   static const DataType type = DataType::NESTED;
   static const DataType physical_type = DataType::BINARY;
+  static const DataType element_type = ARRAY_ELEMENT_TYPE;
 
   static const size_t size = sizeof(Slice);
 
   static void AppendDebugStringForValue(const void* val, std::string* str) {
-    // TODO(aserbin): implement once ArrayCellMetadataView is available
-    str->append("not implemented yet for ");
-    str->append(ArrayDataTypeTraits<ARRAY_ELEMENT_TYPE>::name());
+    static constexpr const char* const kErrCorruption = "corrupted array cell 
data";
+    const Slice* cell_ptr = reinterpret_cast<const Slice*>(val);
+    if (!cell_ptr->data() || cell_ptr->empty()) {
+      // This is a NULL array cell.
+      str->append("NULL");
+      return;
+    }
+    ArrayCellMetadataView view(cell_ptr->data(), cell_ptr->size());
+    const auto s = view.Init();
+    DCHECK(s.ok());
+    if (PREDICT_FALSE(!s.ok())) {
+      str->append(kErrCorruption);
+      return;
+    }
+    const size_t elem_num = view.elem_num();
+    if (elem_num == 0) {
+      str->append("[]");
+      return;
+    }
+    DCHECK_NE(0, elem_num);
+    const uint8_t* data_ptr = view.data_as(ARRAY_ELEMENT_TYPE);
+    if (PREDICT_FALSE(!data_ptr)) {
+      str->append(kErrCorruption);
+      return;
+    }
+    str->append("[");
+    const auto* validity = view.not_null_bitmap();
+    for (size_t idx = 0; idx < elem_num; ++idx) {
+      if (idx != 0) {
+        str->append(", ");
+      }
+      if (BitmapTest(validity, idx)) {
+        
DataTypeTraits<ARRAY_ELEMENT_TYPE>::AppendDebugStringForValue(data_ptr, str);
+      } else {
+        str->append("NULL");
+      }
+      data_ptr += sizeof(typename TypeTraits<ARRAY_ELEMENT_TYPE>::cpp_type);
+    }
+    str->append("]");
   }
+
+  // Compare two arrays. If any of the array cells is null, the arrays aren't
+  // equal: this follows the standard notation of comparison of two NULLs in
+  // SQL: 'NULL = NULL' evaluates to 'false'. However: '[NULL] = [NULL]'
+  // evaluates to 'true'.
   static int Compare(const void* lhs, const void* rhs) {
-    // TODO(aserbin): implement once ArrayCellMetadataView is available
-    return -1;
+    const Slice* lhs_cell_ptr = reinterpret_cast<const Slice*>(lhs);
+    if (!lhs_cell_ptr->data() || lhs_cell_ptr->empty()) {
+      return -1;
+    }
+    const Slice* rhs_cell_ptr = reinterpret_cast<const Slice*>(rhs);
+    if (!rhs_cell_ptr->data() || rhs_cell_ptr->empty()) {
+      return -1;
+    }
+
+    ArrayCellMetadataView lhs_view(lhs_cell_ptr->data(), lhs_cell_ptr->size());
+    if (const auto s = lhs_view.Init(); PREDICT_FALSE(!s.ok())) {
+      DCHECK(false) << s.ToString();
+      return -1;
+    }
+    ArrayCellMetadataView rhs_view(rhs_cell_ptr->data(), rhs_cell_ptr->size());
+    if (const auto s = rhs_view.Init(); PREDICT_FALSE(!s.ok())) {
+      DCHECK(false) << s.ToString();
+      return -1;
+    }
+    return Compare(lhs_view, rhs_view, std::numeric_limits<size_t>::max());
   }
+
+  // Return true if increment(a) is equal to b. For arrays, let's define
+  // increment as adding an extra NULL element in the end. So, two arrays
+  // are consecutive if the longer one is equal to the shorter one with an
+  // additional trailing NULL element. This is consistent with the array
+  // comparison rules above, where [0, 1] < [0, 1, NULL],
+  // but [0, 1, NULL] < [0, 2].
   static bool AreConsecutive(const void* a, const void* b) {
-    // TODO(aserbin): implement once ArrayCellMetadataView is available
-    return false;
+
+    // If any of the arrays is null, they cannot be consecutive.
+    const Slice* a_cell_ptr = reinterpret_cast<const Slice*>(a);
+    if (!a_cell_ptr->data() || a_cell_ptr->empty()) {
+      return false;
+    }
+    const Slice* b_cell_ptr = reinterpret_cast<const Slice*>(b);
+    if (!b_cell_ptr->data() || b_cell_ptr->empty()) {
+      return false;
+    }
+
+    ArrayCellMetadataView a_view(a_cell_ptr->data(), a_cell_ptr->size());
+    if (const auto s = a_view.Init(); PREDICT_FALSE(!s.ok())) {
+      DCHECK(false) << s.ToString();
+      return false;
+    }
+    ArrayCellMetadataView b_view(b_cell_ptr->data(), b_cell_ptr->size());
+    if (const auto s = b_view.Init(); PREDICT_FALSE(!s.ok())) {
+      DCHECK(false) << s.ToString();
+      return false;
+    }
+
+    const size_t a_elem_num = a_view.elem_num();
+    const size_t b_elem_num = b_view.elem_num();
+    if (a_elem_num + 1 != b_elem_num) {
+      return false;
+    }
+
+    const uint8_t* b_not_null_bitmap = b_view.not_null_bitmap();
+    if (BitmapTest(b_not_null_bitmap, a_elem_num)) {
+      // The trailing extra element in 'b' must be NULL if 'b' goes
+      // consecutively after 'a'.
+      return false;
+    }
+    return Compare(a_view, b_view, a_elem_num) == 0;
   }
   static const cpp_type* min_value() {
     static const cpp_type kMinVal{};
@@ -942,9 +1044,83 @@ struct ArrayTypeTraits : public 
ArrayDataTypeTraits<ARRAY_ELEMENT_TYPE> {
   static const cpp_type* max_value() {
     return nullptr;
   }
-  static bool IsVirtual() {
+  static constexpr bool IsVirtual() {
     return false;
   }
+
+ private:
+  // Compare array represented by the specified array views facades up to
+  // the specified number of elements. The comparison goes element-by-element,
+  // using Compare() method for the corresponding element scalar type.
+  static int Compare(const ArrayCellMetadataView& lhs,
+                     const ArrayCellMetadataView& rhs,
+                     size_t num_elems_to_compare) {
+
+    if (num_elems_to_compare == 0) {
+      return 0;
+    }
+
+    const size_t lhs_elems = lhs.elem_num();
+    const size_t rhs_elems = rhs.elem_num();
+
+    // The number of array elements available for comparison in each of the 
arrays.
+    const size_t num_elems = std::min(lhs_elems, rhs_elems);
+    // The number of array elements available for comparison in each of the
+    // arrays, additionally capped by the 'num_elems_to_compare' parameter.
+    const size_t cap_num_elems = std::min(num_elems, num_elems_to_compare);
+
+    const uint8_t* lhs_data_ptr = lhs.data_as(ARRAY_ELEMENT_TYPE);
+    DCHECK(lhs_data_ptr || lhs_elems == 0);
+    const uint8_t* lhs_not_null_bitmap = lhs.not_null_bitmap();
+    DCHECK(lhs_not_null_bitmap || lhs_elems == 0);
+
+    const uint8_t* rhs_data_ptr = rhs.data_as(ARRAY_ELEMENT_TYPE);
+    DCHECK(rhs_data_ptr || rhs_elems == 0);
+    const uint8_t* rhs_not_null_bitmap = rhs.not_null_bitmap();
+    DCHECK(rhs_not_null_bitmap || rhs_elems == 0);
+
+    for (size_t idx = 0; idx < cap_num_elems; ++idx,
+        lhs_data_ptr += sizeof(typename 
TypeTraits<ARRAY_ELEMENT_TYPE>::cpp_type),
+        rhs_data_ptr += sizeof(typename 
TypeTraits<ARRAY_ELEMENT_TYPE>::cpp_type)) {
+      if (!BitmapTest(lhs_not_null_bitmap, idx)) {
+        if (!BitmapTest(rhs_not_null_bitmap, idx)) {
+          // Both elements are NULL: continue with next elements in the arrays.
+          continue;
+        }
+        // lhs < rhs: a non-NULL element in rhs at idx, but lhs has NULL
+        // at idx, while all the pairs of elements in the arrays contain
+        // same numbers (or two NULLs) up to the 'idx' position.
+        return -1;
+      } else {
+        if (!BitmapTest(rhs_not_null_bitmap, idx)) {
+          // lhs > rhs: a non-NULL element in lhs at idx, but rhs non-NULL
+          // at idx, while all the pairs of elements in the array contain
+          // same numbers (or two NULLs) up to the 'idx' position.
+          return 1;
+        }
+        // OK, it's time to compare two non-null elements at same index.
+        const int res = DataTypeTraits<ARRAY_ELEMENT_TYPE>::Compare(
+            lhs_data_ptr, rhs_data_ptr);
+        if (res != 0) {
+          return res;
+        }
+      }
+    }
+    // At this point, all pairs of elements up to 'cap_num_elems' idx contai
+    // same values or two NULLs.
+    if (num_elems >= num_elems_to_compare) {
+      return 0;
+    }
+    DCHECK(lhs_elems <= num_elems_to_compare && rhs_elems <= 
num_elems_to_compare);
+    if (lhs_elems < rhs_elems) {
+      return -1;
+    }
+    if (lhs_elems > rhs_elems) {
+      return 1;
+    }
+
+    return 0;
+  }
 };
 
 class Variant final {
diff --git a/src/kudu/util/bitmap-test.cc b/src/kudu/util/bitmap-test.cc
index dd71750a4..948bf1bc0 100644
--- a/src/kudu/util/bitmap-test.cc
+++ b/src/kudu/util/bitmap-test.cc
@@ -17,10 +17,12 @@
 
 #include "kudu/util/bitmap.h"
 
+#include <algorithm>
 #include <cstddef>
 #include <cstdint>
 #include <cstring>
 #include <set>
+#include <string>
 #include <vector>
 
 #include <gtest/gtest.h>
@@ -380,6 +382,52 @@ TEST(TestBitMap, TestCopy) {
   }
 }
 
+TEST(TestBitMap, BitmapToVector) {
+  constexpr size_t kNumBytes = 8;
+  constexpr size_t kNumBits = kNumBytes * 8;
+  constexpr uint8_t kAllZeroes[kNumBytes] = { 0 };
+
+  {
+    const auto v = BitmapToVector(kAllZeroes, kNumBits);
+    ASSERT_EQ(kNumBits, v.size());
+    ASSERT_TRUE(std::all_of(v.begin(), v.end(), [](bool e) { return !e; }));
+  }
+  {
+    uint8_t input[kNumBytes];
+    BitmapCopy(input, 0, kAllZeroes, 0, kNumBits);
+
+    BitmapChangeBits(input, 0, kNumBits, true);
+    const auto v = BitmapToVector(input, kNumBits);
+    ASSERT_EQ(kNumBits, v.size());
+    ASSERT_TRUE(std::all_of(v.begin(), v.end(), [](bool e) { return e; }));
+  }
+  {
+    uint8_t input[kNumBytes];
+    BitmapCopy(input, 0, kAllZeroes, 0, kNumBits);
+
+    BitmapChangeBits(input, 0, kNumBits / 2, true);
+    const auto v = BitmapToVector(input, kNumBits);
+    ASSERT_EQ(kNumBits, v.size());
+    auto it_half = v.begin() + (v.size() / 2);
+    ASSERT_TRUE(std::all_of(v.begin(), it_half, [](bool e) { return e; }));
+    ASSERT_TRUE(std::all_of(it_half, v.end(), [](bool e) { return !e; }));
+  }
+  {
+    uint8_t input[1] = { 0b00000001 };
+    const auto v = BitmapToVector(input, 1);
+    ASSERT_EQ(1, v.size());
+    ASSERT_TRUE(v.front());
+  }
+  {
+    uint8_t input[1] = { 0b10000001 };
+    const auto v = BitmapToVector(input, 8);
+    ASSERT_EQ(8, v.size());
+    ASSERT_TRUE(v.front());
+    ASSERT_TRUE(v.back());
+    ASSERT_TRUE(std::all_of(v.begin() + 1, v.end() - 1, [](bool e) { return 
!e; }));
+  }
+}
+
 #ifndef NDEBUG
 TEST(TestBitMapDeathTest, TestCopyOverlap) {
   uint8_t bm[2] = { 0 };
diff --git a/src/kudu/util/bitmap.cc b/src/kudu/util/bitmap.cc
index 67c431e03..9de95ca72 100644
--- a/src/kudu/util/bitmap.cc
+++ b/src/kudu/util/bitmap.cc
@@ -20,12 +20,14 @@
 #include <cstring>
 #include <ostream>
 #include <string>
+#include <vector>
 
 #include <glog/logging.h>
 
 #include "kudu/gutil/stringprintf.h"
 
 using std::string;
+using std::vector;
 
 namespace kudu {
 
@@ -152,6 +154,20 @@ void BitmapCopy(uint8_t* dst, size_t dst_offset,
   }
 }
 
+vector<bool> BitmapToVector(const uint8_t* bitmap, size_t num_bits) {
+  BitmapIterator it(bitmap, num_bits);
+  vector<bool> result(num_bits);
+  bool is_set = false;
+  size_t offset = 0;
+  while (const size_t num_elem = it.Next(&is_set)) {
+    for (size_t i = 0; i < num_elem; ++i) {
+      result[offset + i] = is_set;
+    }
+    offset += num_elem;
+  }
+  return result;
+}
+
 string BitmapToString(const uint8_t* bitmap, size_t num_bits) {
   string s;
   size_t index = 0;
diff --git a/src/kudu/util/bitmap.h b/src/kudu/util/bitmap.h
index c63cfb470..96b078b24 100644
--- a/src/kudu/util/bitmap.h
+++ b/src/kudu/util/bitmap.h
@@ -22,6 +22,7 @@
 #include <cstddef>
 #include <cstdint>
 #include <string>
+#include <vector>
 
 #include <glog/logging.h>
 
@@ -124,6 +125,8 @@ void BitmapCopy(uint8_t* dst, size_t dst_offset,
                 const uint8_t* src, size_t src_offset,
                 size_t num_bits);
 
+std::vector<bool> BitmapToVector(const uint8_t* bitmap, size_t num_bits);
+
 std::string BitmapToString(const uint8_t* bitmap, size_t num_bits);
 
 // Iterator which yields ranges of set and unset bits.

Reply via email to