This is an automated email from the ASF dual-hosted git repository.
zhli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 646015857 [VL] Fix timestamp precision loss in serializer (#5376)
646015857 is described below
commit 6460158578f2264b2abfa8c4f5648d96373cde41
Author: Zhen Li <[email protected]>
AuthorDate: Mon Apr 15 12:04:44 2024 +0800
[VL] Fix timestamp precision loss in serializer (#5376)
[VL] Fix timestamp precision loss in serialize
---
.../test/scala/org/apache/gluten/execution/TestOperator.scala | 11 +++++++++++
.../operators/serializer/VeloxColumnarBatchSerializer.cc | 5 +++--
cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h | 1 +
cpp/velox/shuffle/VeloxShuffleReader.cc | 4 +++-
cpp/velox/shuffle/VeloxShuffleWriter.cc | 6 +++---
cpp/velox/shuffle/VeloxShuffleWriter.h | 2 ++
6 files changed, 23 insertions(+), 6 deletions(-)
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
index c3e14ef19..a2a4b0c4a 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
@@ -1307,4 +1307,15 @@ class TestOperator extends
VeloxWholeStageTransformerSuite {
assert(plan1.find(_.isInstanceOf[ProjectExecTransformer]).isDefined)
assert(plan2.find(_.isInstanceOf[ProjectExecTransformer]).isDefined)
}
+
+ test("timestamp broadcast join") {
+ spark.range(0, 5).createOrReplaceTempView("right")
+ spark.sql("SELECT id, timestamp_micros(id) as ts from
right").createOrReplaceTempView("left")
+ val expected = spark.sql("SELECT unix_micros(ts) from left")
+ val df = spark.sql(
+ "SELECT unix_micros(ts)" +
+ " FROM left RIGHT OUTER JOIN right ON left.id = right.id")
+ // Verify there is not precision loss for timestamp columns after data
broadcast.
+ checkAnswer(df, expected)
+ }
}
diff --git a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc
b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc
index d0ebd8057..0f2cb9a56 100644
--- a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc
+++ b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc
@@ -50,6 +50,7 @@ VeloxColumnarBatchSerializer::VeloxColumnarBatchSerializer(
ArrowSchemaRelease(cSchema); // otherwise the c schema leaks memory
}
serde_ = std::make_unique<serializer::presto::PrestoVectorSerde>();
+ options_.useLosslessTimestamp = true;
}
std::shared_ptr<arrow::Buffer>
VeloxColumnarBatchSerializer::serializeColumnarBatches(
@@ -60,7 +61,7 @@ std::shared_ptr<arrow::Buffer>
VeloxColumnarBatchSerializer::serializeColumnarBa
auto numRows = firstRowVector->size();
auto arena = std::make_unique<StreamArena>(veloxPool_.get());
auto rowType = asRowType(firstRowVector->type());
- auto serializer = serde_->createIterativeSerializer(rowType, numRows,
arena.get(), /* serdeOptions */ nullptr);
+ auto serializer = serde_->createIterativeSerializer(rowType, numRows,
arena.get(), &options_);
for (auto& batch : batches) {
auto rowVector = VeloxColumnarBatch::from(veloxPool_.get(),
batch)->getRowVector();
numRows = rowVector->size();
@@ -84,7 +85,7 @@ std::shared_ptr<arrow::Buffer>
VeloxColumnarBatchSerializer::serializeColumnarBa
std::shared_ptr<ColumnarBatch>
VeloxColumnarBatchSerializer::deserialize(uint8_t* data, int32_t size) {
RowVectorPtr result;
auto byteStream = toByteStream(data, size);
- serde_->deserialize(byteStream.get(), veloxPool_.get(), rowType_, &result,
/* serdeOptions */ nullptr);
+ serde_->deserialize(byteStream.get(), veloxPool_.get(), rowType_, &result,
&options_);
return std::make_shared<VeloxColumnarBatch>(result);
}
} // namespace gluten
diff --git a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h
b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h
index 79e247c67..18539a245 100644
--- a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h
+++ b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h
@@ -41,6 +41,7 @@ class VeloxColumnarBatchSerializer final : public
ColumnarBatchSerializer {
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;
facebook::velox::RowTypePtr rowType_;
std::unique_ptr<facebook::velox::serializer::presto::PrestoVectorSerde>
serde_;
+ facebook::velox::serializer::presto::PrestoVectorSerde::PrestoOptions
options_;
};
} // namespace gluten
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc
b/cpp/velox/shuffle/VeloxShuffleReader.cc
index 950a2a05b..5c4e01936 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.cc
+++ b/cpp/velox/shuffle/VeloxShuffleReader.cc
@@ -183,7 +183,9 @@ RowVectorPtr readComplexType(BufferPtr buffer, RowTypePtr&
rowType, memory::Memo
RowVectorPtr result;
auto byteStream = toByteStream(const_cast<uint8_t*>(buffer->as<uint8_t>()),
buffer->size());
auto serde = std::make_unique<serializer::presto::PrestoVectorSerde>();
- serde->deserialize(byteStream.get(), pool, rowType, &result, /* serdeOptions
*/ nullptr);
+ serializer::presto::PrestoVectorSerde::PrestoOptions options;
+ options.useLosslessTimestamp = true;
+ serde->deserialize(byteStream.get(), pool, rowType, &result, &options);
return result;
}
diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.cc
b/cpp/velox/shuffle/VeloxShuffleWriter.cc
index 909d4ebc3..8b69471e7 100644
--- a/cpp/velox/shuffle/VeloxShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxShuffleWriter.cc
@@ -274,8 +274,8 @@ void VeloxShuffleWriter::setPartitionBufferSize(uint16_t
newSize) {
arrow::Result<std::shared_ptr<arrow::Buffer>>
VeloxShuffleWriter::generateComplexTypeBuffers(
facebook::velox::RowVectorPtr vector) {
auto arena =
std::make_unique<facebook::velox::StreamArena>(veloxPool_.get());
- auto serializer = serde_.createIterativeSerializer(
- asRowType(vector->type()), vector->size(), arena.get(), /* serdeOptions
*/ nullptr);
+ auto serializer =
+ serde_.createIterativeSerializer(asRowType(vector->type()),
vector->size(), arena.get(), &serdeOptions_);
const facebook::velox::IndexRange allRows{0, vector->size()};
serializer->append(vector, folly::Range(&allRows, 1));
auto serializedSize = serializer->maxSerializedSize();
@@ -736,7 +736,7 @@ arrow::Status VeloxShuffleWriter::splitComplexType(const
facebook::velox::RowVec
arenas_[partition] =
std::make_unique<facebook::velox::StreamArena>(veloxPool_.get());
}
complexTypeData_[partition] = serde_.createIterativeSerializer(
- complexWriteType_, partition2RowCount_[partition],
arenas_[partition].get(), /* serdeOptions */ nullptr);
+ complexWriteType_, partition2RowCount_[partition],
arenas_[partition].get(), &serdeOptions_);
}
rowIndexs[partition].emplace_back(facebook::velox::IndexRange{row, 1});
}
diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.h
b/cpp/velox/shuffle/VeloxShuffleWriter.h
index f76ad9ad1..d0bc8445c 100644
--- a/cpp/velox/shuffle/VeloxShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxShuffleWriter.h
@@ -202,6 +202,7 @@ class VeloxShuffleWriter final : public ShuffleWriter {
: ShuffleWriter(numPartitions, std::move(partitionWriter),
std::move(options), pool),
veloxPool_(std::move(veloxPool)) {
arenas_.resize(numPartitions);
+ serdeOptions_.useLosslessTimestamp = true;
}
arrow::Status init();
@@ -409,6 +410,7 @@ class VeloxShuffleWriter final : public ShuffleWriter {
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;
std::vector<std::unique_ptr<facebook::velox::StreamArena>> arenas_;
facebook::velox::serializer::presto::PrestoVectorSerde serde_;
+ facebook::velox::serializer::presto::PrestoVectorSerde::PrestoOptions
serdeOptions_;
// stat
enum CpuWallTimingType {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]