pitrou commented on a change in pull request #7544:
URL: https://github.com/apache/arrow/pull/7544#discussion_r447729043



##########
File path: cpp/src/arrow/ipc/read_write_test.cc
##########
@@ -1228,6 +1228,152 @@ TEST_P(TestFileFormat, RoundTrip) {
   TestZeroLengthRoundTrip(*GetParam(), options);
 }
 
+Status MakeDictionaryBatch(std::shared_ptr<RecordBatch>* out) {
+  const int64_t length = 6;
+
+  std::vector<bool> is_valid = {true, true, false, true, true, true};
+
+  auto dict_ty = utf8();
+
+  auto dict = ArrayFromJSON(dict_ty, "[\"foo\", \"bar\", \"baz\"]");
+
+  auto f0_type = arrow::dictionary(arrow::int32(), dict_ty);
+  auto f1_type = arrow::dictionary(arrow::int8(), dict_ty);
+
+  std::shared_ptr<Array> indices0, indices1;
+  std::vector<int32_t> indices0_values = {1, 2, -1, 0, 2, 0};
+  std::vector<int8_t> indices1_values = {0, 0, 2, 2, 1, 1};
+
+  ArrayFromVector<Int32Type, int32_t>(is_valid, indices0_values, &indices0);
+  ArrayFromVector<Int8Type, int8_t>(is_valid, indices1_values, &indices1);

Review comment:
       You can create the indices arrays with `ArrayFromJSON` too, it will 
probably make the code simpler to read and maintain.

##########
File path: cpp/src/arrow/ipc/metadata_internal.h
##########
@@ -198,7 +198,7 @@ Status WriteDictionaryMessage(
     const int64_t id, const int64_t length, const int64_t body_length,
     const std::shared_ptr<const KeyValueMetadata>& custom_metadata,
     const std::vector<FieldMetadata>& nodes, const 
std::vector<BufferMetadata>& buffers,
-    const IpcWriteOptions& options, std::shared_ptr<Buffer>* out);
+    const IpcWriteOptions& options, std::shared_ptr<Buffer>* out, bool 
isDelta);

Review comment:
       We should keep `std::shared<Buffer>* out` at the end of the arguments.

##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -684,7 +685,19 @@ Status ReadDictionary(const Buffer& metadata, 
DictionaryMemo* dictionary_memo,
     return Status::Invalid("Dictionary record batch must only contain one 
field");
   }
   auto dictionary = batch->column(0);
-  return dictionary_memo->AddDictionary(id, dictionary);
+  if (dictionary_batch->isDelta()) {
+    std::shared_ptr<Array> originalDict, combinedDict;
+    RETURN_NOT_OK(dictionary_memo->GetDictionary(id, &originalDict));
+    ArrayVector dictsToCombine{originalDict, dictionary};
+    ARROW_ASSIGN_OR_RAISE(combinedDict, Concatenate(dictsToCombine, 
options.memory_pool));

Review comment:
       How about folding this logic, e.g. add a `AddDirectoryDelta` method to 
`DictionaryMemo`?

##########
File path: cpp/src/arrow/ipc/writer.h
##########
@@ -341,6 +343,29 @@ class ARROW_EXPORT IpcPayloadWriter {
   virtual Status Close() = 0;
 };
 
+/// Create a new IPC payload stream writer from stream sink. User is
+/// responsible for closing the actual OutputStream.
+///
+/// \param[in] sink output stream to write to
+/// \param[in] options options for serialization
+/// \return Result<std::shared_ptr<IpcPayloadWriter>>
+ARROW_EXPORT
+Result<std::unique_ptr<IpcPayloadWriter>> NewPayloadStreamWriter(

Review comment:
       Nit: call this `MakePayloadStreamWriter`

##########
File path: cpp/src/arrow/ipc/writer.h
##########
@@ -341,6 +343,29 @@ class ARROW_EXPORT IpcPayloadWriter {
   virtual Status Close() = 0;
 };
 
+/// Create a new IPC payload stream writer from stream sink. User is
+/// responsible for closing the actual OutputStream.
+///
+/// \param[in] sink output stream to write to
+/// \param[in] options options for serialization
+/// \return Result<std::shared_ptr<IpcPayloadWriter>>
+ARROW_EXPORT
+Result<std::unique_ptr<IpcPayloadWriter>> NewPayloadStreamWriter(
+    io::OutputStream* sink, const IpcWriteOptions& options = 
IpcWriteOptions::Defaults());
+
+/// Create a new IPC payload file writer from stream sink.
+///
+/// \param[in] sink output stream to write to
+/// \param[in] schema the schema of the record batches to be written
+/// \param[in] options options for serialization, optional
+/// \param[in] metadata custom metadata for File Footer, optional
+/// \return Status
+ARROW_EXPORT
+Result<std::unique_ptr<IpcPayloadWriter>> NewPayloadFileWriter(

Review comment:
       Nit: call this `MakePayloadFileWriter`

##########
File path: cpp/src/arrow/ipc/read_write_test.cc
##########
@@ -1228,6 +1228,152 @@ TEST_P(TestFileFormat, RoundTrip) {
   TestZeroLengthRoundTrip(*GetParam(), options);
 }
 
+Status MakeDictionaryBatch(std::shared_ptr<RecordBatch>* out) {
+  const int64_t length = 6;
+
+  std::vector<bool> is_valid = {true, true, false, true, true, true};
+
+  auto dict_ty = utf8();
+
+  auto dict = ArrayFromJSON(dict_ty, "[\"foo\", \"bar\", \"baz\"]");
+
+  auto f0_type = arrow::dictionary(arrow::int32(), dict_ty);
+  auto f1_type = arrow::dictionary(arrow::int8(), dict_ty);
+
+  std::shared_ptr<Array> indices0, indices1;
+  std::vector<int32_t> indices0_values = {1, 2, -1, 0, 2, 0};
+  std::vector<int8_t> indices1_values = {0, 0, 2, 2, 1, 1};
+
+  ArrayFromVector<Int32Type, int32_t>(is_valid, indices0_values, &indices0);
+  ArrayFromVector<Int8Type, int8_t>(is_valid, indices1_values, &indices1);
+
+  auto a0 = std::make_shared<DictionaryArray>(f0_type, indices0, dict);
+  auto a1 = std::make_shared<DictionaryArray>(f1_type, indices1, dict);
+
+  // construct batch
+  auto schema = ::arrow::schema({field("dict1", f0_type), field("dict2", 
f1_type)});
+
+  *out = RecordBatch::Make(schema, length, {a0, a1});
+  return Status::OK();
+}
+
+// A record batch writer implementation that supports manually specifying 
dictionaries.
+class TestRecordBatchWriter : public RecordBatchWriter {

Review comment:
       `TestRecordBatchWriter` makes it look like a test case. Perhaps call it 
`CustomRecordBatchWriter`?
   (also, I'm not sure it makes sense to inherit from `RecordBatchWriter`)

##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -684,7 +685,19 @@ Status ReadDictionary(const Buffer& metadata, 
DictionaryMemo* dictionary_memo,
     return Status::Invalid("Dictionary record batch must only contain one 
field");
   }
   auto dictionary = batch->column(0);
-  return dictionary_memo->AddDictionary(id, dictionary);
+  if (dictionary_batch->isDelta()) {
+    std::shared_ptr<Array> originalDict, combinedDict;
+    RETURN_NOT_OK(dictionary_memo->GetDictionary(id, &originalDict));
+    ArrayVector dictsToCombine{originalDict, dictionary};
+    ARROW_ASSIGN_OR_RAISE(combinedDict, Concatenate(dictsToCombine, 
options.memory_pool));
+    return dictionary_memo->UpdateDictionary(id, combinedDict);
+  }
+
+  if (dictionary_memo->HasDictionary(id)) {

Review comment:
       Instead of spelling this out manually here, perhaps add 
`ReplaceDictionary` method to `DictionaryMemo`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to