This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 5856421e31 GH-39921: [Go][Parquet] ColumnWriter not reset
TotalCompressedBytes after Flush (#39922)
5856421e31 is described below
commit 5856421e31b163104570d0305cb79f323cf488a6
Author: mwish <[email protected]>
AuthorDate: Mon Feb 5 23:14:48 2024 +0800
GH-39921: [Go][Parquet] ColumnWriter not reset TotalCompressedBytes after
Flush (#39922)
### Rationale for this change
See https://github.com/apache/arrow/issues/39921
### What changes are included in this PR?
Not clearing `totalCompressedBytes` when flush called
### Are these changes tested?
Yes
### Are there any user-facing changes?
Yes, it's a bugfix
* Closes: #39921
Authored-by: mwish <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
---
go/parquet/file/column_writer.go | 5 +++--
go/parquet/file/column_writer_test.go | 28 ++++++++++++++++++++++++++++
2 files changed, 31 insertions(+), 2 deletions(-)
diff --git a/go/parquet/file/column_writer.go b/go/parquet/file/column_writer.go
index ac857d17e6..36663b10b8 100755
--- a/go/parquet/file/column_writer.go
+++ b/go/parquet/file/column_writer.go
@@ -397,7 +397,6 @@ func (w *columnWriter) FlushBufferedDataPages() (err error)
{
}
}
w.pages = w.pages[:0]
- w.totalCompressedBytes = 0
return
}
@@ -542,7 +541,9 @@ func (w *columnWriter) Close() (err error) {
if !w.closed {
w.closed = true
if w.hasDict && !w.fallbackToNonDict {
- w.WriteDictionaryPage()
+ if err = w.WriteDictionaryPage(); err != nil {
+ return err
+ }
}
if err = w.FlushBufferedDataPages(); err != nil {
diff --git a/go/parquet/file/column_writer_test.go
b/go/parquet/file/column_writer_test.go
index 8011ac2487..321e7b730d 100755
--- a/go/parquet/file/column_writer_test.go
+++ b/go/parquet/file/column_writer_test.go
@@ -426,6 +426,26 @@ func (p *PrimitiveWriterTestSuite)
testDictionaryFallbackEncoding(version parque
}
}
+func (p *PrimitiveWriterTestSuite)
testDictionaryFallbackAndCompressedSize(version parquet.Version) {
+ p.GenerateData(SmallSize)
+ props := parquet.DefaultColumnProperties()
+ props.DictionaryEnabled = true
+
+ if version == parquet.V1_0 {
+ props.Encoding = parquet.Encodings.PlainDict
+ } else {
+ props.Encoding = parquet.Encodings.RLEDict
+ }
+
+ writer := p.buildWriter(SmallSize, props, parquet.WithVersion(version))
+ p.WriteBatchValues(writer, nil, nil)
+ writer.FallbackToPlain()
+ p.NotEqual(0, writer.TotalCompressedBytes())
+ writer.Close()
+ p.NotEqual(0, writer.TotalCompressedBytes())
+ p.NotEqual(0, writer.TotalBytesWritten())
+}
+
func (p *PrimitiveWriterTestSuite) TestRequiredPlain() {
p.testRequiredWithEncoding(parquet.Encodings.Plain)
}
@@ -575,6 +595,14 @@ func (p *PrimitiveWriterTestSuite)
TestDictionaryFallbackEncodingV2() {
p.testDictionaryFallbackEncoding(parquet.V2_LATEST)
}
+func (p *PrimitiveWriterTestSuite) TestDictionaryFallbackStatsV1() {
+ p.testDictionaryFallbackAndCompressedSize(parquet.V1_0)
+}
+
+func (p *PrimitiveWriterTestSuite) TestDictionaryFallbackStatsV2() {
+ p.testDictionaryFallbackAndCompressedSize(parquet.V2_LATEST)
+}
+
func (p *PrimitiveWriterTestSuite) TestOptionalNullValueChunk() {
// test case for NULL values
p.SetupSchema(parquet.Repetitions.Optional, 1)