bkietz commented on a change in pull request #8894:
URL: https://github.com/apache/arrow/pull/8894#discussion_r545531465



##########
File path: cpp/src/arrow/dataset/partition.cc
##########
@@ -573,5 +530,192 @@ Result<std::shared_ptr<Schema>> 
PartitioningOrFactory::GetOrInferSchema(
   return factory()->Inspect(paths);
 }
 
+// Transform an array of counts to offsets which will divide a ListArray
+// into an equal number of slices with corresponding lengths.
+inline Result<std::shared_ptr<Array>> CountsToOffsets(
+    std::shared_ptr<Int64Array> counts) {
+  Int32Builder offset_builder;
+  RETURN_NOT_OK(offset_builder.Resize(counts->length() + 1));
+  offset_builder.UnsafeAppend(0);
+
+  for (int64_t i = 0; i < counts->length(); ++i) {
+    DCHECK_NE(counts->Value(i), 0);
+    auto next_offset = static_cast<int32_t>(offset_builder[i] + 
counts->Value(i));
+    offset_builder.UnsafeAppend(next_offset);
+  }
+
+  std::shared_ptr<Array> offsets;
+  RETURN_NOT_OK(offset_builder.Finish(&offsets));
+  return offsets;
+}
+
+// Helper for simultaneous dictionary encoding of multiple arrays.
+//
+// The fused dictionary is the Cartesian product of the individual 
dictionaries.
+// For example given two arrays A, B where A has unique values ["ex", "why"]
+// and B has unique values [0, 1] the fused dictionary is the set of tuples
+// [["ex", 0], ["ex", 1], ["why", 0], ["ex", 1]].
+//
+// TODO(bkietz) this capability belongs in an Action of the hash kernels, where
+// it can be used to group aggregates without materializing a grouped batch.
+// For the purposes of writing we need the materialized grouped batch anyway
+// since no Writers accept a selection vector.
+class StructDictionary {
+ public:
+  struct Encoded {
+    std::shared_ptr<Int32Array> indices;
+    std::shared_ptr<StructDictionary> dictionary;
+  };
+
+  static Result<Encoded> Encode(const ArrayVector& columns) {
+    Encoded out{nullptr, std::make_shared<StructDictionary>()};
+
+    for (const auto& column : columns) {
+      if (column->null_count() != 0) {
+        return Status::NotImplemented("Grouping on a field with nulls");
+      }
+
+      RETURN_NOT_OK(out.dictionary->AddOne(column, &out.indices));
+    }
+
+    return out;
+  }
+
+  Result<std::shared_ptr<StructArray>> Decode(std::shared_ptr<Int32Array> 
fused_indices,
+                                              FieldVector fields) {
+    std::vector<Int32Builder> builders(dictionaries_.size());
+    for (Int32Builder& b : builders) {
+      RETURN_NOT_OK(b.Resize(fused_indices->length()));
+    }
+
+    std::vector<int32_t> codes(dictionaries_.size());
+    for (int64_t i = 0; i < fused_indices->length(); ++i) {
+      Expand(fused_indices->Value(i), codes.data());
+
+      auto builder_it = builders.begin();
+      for (int32_t index : codes) {
+        builder_it++->UnsafeAppend(index);
+      }
+    }
+
+    ArrayVector columns(dictionaries_.size());
+    for (size_t i = 0; i < dictionaries_.size(); ++i) {
+      std::shared_ptr<ArrayData> indices;
+      RETURN_NOT_OK(builders[i].FinishInternal(&indices));
+
+      ARROW_ASSIGN_OR_RAISE(Datum column, compute::Take(dictionaries_[i], 
indices));
+      columns[i] = column.make_array();
+    }
+
+    return StructArray::Make(std::move(columns), std::move(fields));
+  }
+
+ private:
+  Status AddOne(Datum column, std::shared_ptr<Int32Array>* fused_indices) {
+    ArrayData* encoded;
+    if (column.type()->id() != Type::DICTIONARY) {
+      ARROW_ASSIGN_OR_RAISE(column, compute::DictionaryEncode(column));
+    }
+    encoded = column.mutable_array();
+
+    auto indices =
+        std::make_shared<Int32Array>(encoded->length, 
std::move(encoded->buffers[1]));
+
+    dictionaries_.push_back(MakeArray(std::move(encoded->dictionary)));
+    auto dictionary_size = 
static_cast<int32_t>(dictionaries_.back()->length());
+
+    if (*fused_indices == nullptr) {
+      *fused_indices = std::move(indices);
+      size_ = dictionary_size;
+      return Status::OK();
+    }
+
+    // It's useful to think about the case where each of dictionaries_ has 
size 10.
+    // In this case the decimal digit in the ones place is the code in 
dictionaries_[0],
+    // the tens place corresponds to dictionaries_[1], etc.
+    // The incumbent indices must be shifted to the hundreds place so as not 
to collide.
+    ARROW_ASSIGN_OR_RAISE(Datum new_fused_indices,
+                          compute::Multiply(indices, MakeScalar(size_)));
+
+    ARROW_ASSIGN_OR_RAISE(new_fused_indices,
+                          compute::Add(new_fused_indices, *fused_indices));
+
+    *fused_indices = 
checked_pointer_cast<Int32Array>(new_fused_indices.make_array());
+
+    // XXX should probably cap this at 2**15 or so
+    ARROW_CHECK(!internal::MultiplyWithOverflow(size_, dictionary_size, 
&size_));

Review comment:
       I'll return an error instead.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to