This is an automated email from the ASF dual-hosted git repository.

zhli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 646015857 [VL] Fix timestamp precision loss in serializer (#5376)
646015857 is described below

commit 6460158578f2264b2abfa8c4f5648d96373cde41
Author: Zhen Li <[email protected]>
AuthorDate: Mon Apr 15 12:04:44 2024 +0800

    [VL] Fix timestamp precision loss in serializer (#5376)
    
    [VL] Fix timestamp precision loss in serialize
---
 .../test/scala/org/apache/gluten/execution/TestOperator.scala | 11 +++++++++++
 .../operators/serializer/VeloxColumnarBatchSerializer.cc      |  5 +++--
 cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h |  1 +
 cpp/velox/shuffle/VeloxShuffleReader.cc                       |  4 +++-
 cpp/velox/shuffle/VeloxShuffleWriter.cc                       |  6 +++---
 cpp/velox/shuffle/VeloxShuffleWriter.h                        |  2 ++
 6 files changed, 23 insertions(+), 6 deletions(-)

diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala 
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
index c3e14ef19..a2a4b0c4a 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
@@ -1307,4 +1307,15 @@ class TestOperator extends 
VeloxWholeStageTransformerSuite {
     assert(plan1.find(_.isInstanceOf[ProjectExecTransformer]).isDefined)
     assert(plan2.find(_.isInstanceOf[ProjectExecTransformer]).isDefined)
   }
+
+  test("timestamp broadcast join") {
+    spark.range(0, 5).createOrReplaceTempView("right")
+    spark.sql("SELECT id, timestamp_micros(id) as ts from 
right").createOrReplaceTempView("left")
+    val expected = spark.sql("SELECT unix_micros(ts) from left")
+    val df = spark.sql(
+      "SELECT unix_micros(ts)" +
+        " FROM left RIGHT OUTER JOIN right ON left.id = right.id")
+    // Verify there is not precision loss for timestamp columns after data 
broadcast.
+    checkAnswer(df, expected)
+  }
 }
diff --git a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc 
b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc
index d0ebd8057..0f2cb9a56 100644
--- a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc
+++ b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc
@@ -50,6 +50,7 @@ VeloxColumnarBatchSerializer::VeloxColumnarBatchSerializer(
     ArrowSchemaRelease(cSchema); // otherwise the c schema leaks memory
   }
   serde_ = std::make_unique<serializer::presto::PrestoVectorSerde>();
+  options_.useLosslessTimestamp = true;
 }
 
 std::shared_ptr<arrow::Buffer> 
VeloxColumnarBatchSerializer::serializeColumnarBatches(
@@ -60,7 +61,7 @@ std::shared_ptr<arrow::Buffer> 
VeloxColumnarBatchSerializer::serializeColumnarBa
   auto numRows = firstRowVector->size();
   auto arena = std::make_unique<StreamArena>(veloxPool_.get());
   auto rowType = asRowType(firstRowVector->type());
-  auto serializer = serde_->createIterativeSerializer(rowType, numRows, 
arena.get(), /* serdeOptions */ nullptr);
+  auto serializer = serde_->createIterativeSerializer(rowType, numRows, 
arena.get(), &options_);
   for (auto& batch : batches) {
     auto rowVector = VeloxColumnarBatch::from(veloxPool_.get(), 
batch)->getRowVector();
     numRows = rowVector->size();
@@ -84,7 +85,7 @@ std::shared_ptr<arrow::Buffer> 
VeloxColumnarBatchSerializer::serializeColumnarBa
 std::shared_ptr<ColumnarBatch> 
VeloxColumnarBatchSerializer::deserialize(uint8_t* data, int32_t size) {
   RowVectorPtr result;
   auto byteStream = toByteStream(data, size);
-  serde_->deserialize(byteStream.get(), veloxPool_.get(), rowType_, &result, 
/* serdeOptions */ nullptr);
+  serde_->deserialize(byteStream.get(), veloxPool_.get(), rowType_, &result, 
&options_);
   return std::make_shared<VeloxColumnarBatch>(result);
 }
 } // namespace gluten
diff --git a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h 
b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h
index 79e247c67..18539a245 100644
--- a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h
+++ b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h
@@ -41,6 +41,7 @@ class VeloxColumnarBatchSerializer final : public 
ColumnarBatchSerializer {
   std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;
   facebook::velox::RowTypePtr rowType_;
   std::unique_ptr<facebook::velox::serializer::presto::PrestoVectorSerde> 
serde_;
+  facebook::velox::serializer::presto::PrestoVectorSerde::PrestoOptions 
options_;
 };
 
 } // namespace gluten
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc 
b/cpp/velox/shuffle/VeloxShuffleReader.cc
index 950a2a05b..5c4e01936 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.cc
+++ b/cpp/velox/shuffle/VeloxShuffleReader.cc
@@ -183,7 +183,9 @@ RowVectorPtr readComplexType(BufferPtr buffer, RowTypePtr& 
rowType, memory::Memo
   RowVectorPtr result;
   auto byteStream = toByteStream(const_cast<uint8_t*>(buffer->as<uint8_t>()), 
buffer->size());
   auto serde = std::make_unique<serializer::presto::PrestoVectorSerde>();
-  serde->deserialize(byteStream.get(), pool, rowType, &result, /* serdeOptions 
*/ nullptr);
+  serializer::presto::PrestoVectorSerde::PrestoOptions options;
+  options.useLosslessTimestamp = true;
+  serde->deserialize(byteStream.get(), pool, rowType, &result, &options);
   return result;
 }
 
diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxShuffleWriter.cc
index 909d4ebc3..8b69471e7 100644
--- a/cpp/velox/shuffle/VeloxShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxShuffleWriter.cc
@@ -274,8 +274,8 @@ void VeloxShuffleWriter::setPartitionBufferSize(uint16_t 
newSize) {
 arrow::Result<std::shared_ptr<arrow::Buffer>> 
VeloxShuffleWriter::generateComplexTypeBuffers(
     facebook::velox::RowVectorPtr vector) {
   auto arena = 
std::make_unique<facebook::velox::StreamArena>(veloxPool_.get());
-  auto serializer = serde_.createIterativeSerializer(
-      asRowType(vector->type()), vector->size(), arena.get(), /* serdeOptions 
*/ nullptr);
+  auto serializer =
+      serde_.createIterativeSerializer(asRowType(vector->type()), 
vector->size(), arena.get(), &serdeOptions_);
   const facebook::velox::IndexRange allRows{0, vector->size()};
   serializer->append(vector, folly::Range(&allRows, 1));
   auto serializedSize = serializer->maxSerializedSize();
@@ -736,7 +736,7 @@ arrow::Status VeloxShuffleWriter::splitComplexType(const 
facebook::velox::RowVec
         arenas_[partition] = 
std::make_unique<facebook::velox::StreamArena>(veloxPool_.get());
       }
       complexTypeData_[partition] = serde_.createIterativeSerializer(
-          complexWriteType_, partition2RowCount_[partition], 
arenas_[partition].get(), /* serdeOptions */ nullptr);
+          complexWriteType_, partition2RowCount_[partition], 
arenas_[partition].get(), &serdeOptions_);
     }
     rowIndexs[partition].emplace_back(facebook::velox::IndexRange{row, 1});
   }
diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.h 
b/cpp/velox/shuffle/VeloxShuffleWriter.h
index f76ad9ad1..d0bc8445c 100644
--- a/cpp/velox/shuffle/VeloxShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxShuffleWriter.h
@@ -202,6 +202,7 @@ class VeloxShuffleWriter final : public ShuffleWriter {
       : ShuffleWriter(numPartitions, std::move(partitionWriter), 
std::move(options), pool),
         veloxPool_(std::move(veloxPool)) {
     arenas_.resize(numPartitions);
+    serdeOptions_.useLosslessTimestamp = true;
   }
 
   arrow::Status init();
@@ -409,6 +410,7 @@ class VeloxShuffleWriter final : public ShuffleWriter {
   std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;
   std::vector<std::unique_ptr<facebook::velox::StreamArena>> arenas_;
   facebook::velox::serializer::presto::PrestoVectorSerde serde_;
+  facebook::velox::serializer::presto::PrestoVectorSerde::PrestoOptions 
serdeOptions_;
 
   // stat
   enum CpuWallTimingType {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to