[
https://issues.apache.org/jira/browse/PARQUET-1773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17021281#comment-17021281
]
Tristan Davolt edited comment on PARQUET-1773 at 1/22/20 5:11 PM:
------------------------------------------------------------------
Full stack trace:
{code:java}
1/22/2020 8:37:42 AM2020-01-22 15:37:42.241 ERROR [des-sss-applog-consumer,,,]
132 --- [pool-1-thread-1] c.d.e.h.ApplogConsumerMessageHandler : Exception
: java.lang.IllegalArgumentException: You cannot call toBytes() more than once
without calling reset()1/22/2020 8:37:42 AM2020-01-22 15:37:42.241 ERROR
[des-sss-applog-consumer,,,] 132 --- [pool-1-thread-1]
c.d.e.h.ApplogConsumerMessageHandler : Exception :
java.lang.IllegalArgumentException: You cannot call toBytes() more than once
without calling reset()1/22/2020 8:37:42 AM at
org.apache.parquet.Preconditions.checkArgument(Preconditions.java:53)1/22/2020
8:37:42 AM at
org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder.toBytes(RunLengthBitPackingHybridEncoder.java:254)1/22/2020
8:37:42 AM at
org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter.getBytes(RunLengthBitPackingHybridValuesWriter.java:65)1/22/2020
8:37:42 AM at
org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:148)1/22/2020
8:37:42 AM at
org.apache.parquet.column.impl.ColumnWriterV1.accountForValueWritten(ColumnWriterV1.java:106)1/22/2020
8:37:42 AM at
org.apache.parquet.column.impl.ColumnWriterV1.write(ColumnWriterV1.java:200)1/22/2020
8:37:42 AM at
org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.addBinary(MessageColumnIO.java:469)1/22/2020
8:37:42 AM at
org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:346)1/22/2020
8:37:42 AM at
org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:278)1/22/2020
8:37:42 AM at
org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191)1/22/2020
8:37:42 AM at
org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)1/22/2020
8:37:42 AM at
org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)1/22/2020
8:37:42 AM at
org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)1/22/2020
8:37:42 AM at
com.test.es.writers.ParquetS3Writer.write(ParquetS3Writer.java:250)1/22/2020
8:37:42 AM at
com.test.es.handler.ApplogConsumerMessageHandler.processMessage(ApplogConsumerMessageHandler.java:60)1/22/2020
8:37:42 AM at
com.test.des.kafka.StreamConsumer.consumeMessage(StreamConsumer.java:160)1/22/2020
8:37:42 AM at
com.test.des.kafka.StreamConsumer.run(StreamConsumer.java:97)1/22/2020 8:37:42
AM at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)1/22/2020
8:37:42 AM at
java.util.concurrent.FutureTask.run(FutureTask.java:266)1/22/2020 8:37:42 AM at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)1/22/2020
8:37:42 AM at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)1/22/2020
8:37:42 AM at java.lang.Thread.run(Thread.java:748)
{code}
We've created a custom class (ParquetS3Writer) that initializes an instance of
AvroParquetWriter called parquetWriter. Our Kafka consumer creates an
instantiation of ParquetS3Writer. ParquetS3Writer.write calls
parquetWriter.write and passes a Generic Data Record, which ultimately calls
ParquetWriter.write with this record. Every time we call ParquetWriter.write,
we increment a counter to keep track of the number of payloads we've written.
We have a set buffer size. When the counter reaches that buffer size, we call
ParquetWriter.close, then initialize a new AvroParquetWriter and repeat the
process.
The exception starts throwing after running for a while without any issues. It
exceeds the buffer size and repeats the cycle above many times without failing.
It might be worth mentioning that we are processing 5-15k messages per minutes
using this message.
was (Author: tdavolt):
Full stack trace:
{code:java}
1/22/2020 8:37:42 AM2020-01-22 15:37:42.241 ERROR [des-sss-applog-consumer,,,]
132 --- [pool-1-thread-1] c.d.e.h.ApplogConsumerMessageHandler : Exception
: java.lang.IllegalArgumentException: You cannot call toBytes() more than once
without calling reset()1/22/2020 8:37:42 AM2020-01-22 15:37:42.241 ERROR
[des-sss-applog-consumer,,,] 132 --- [pool-1-thread-1]
c.d.e.h.ApplogConsumerMessageHandler : Exception :
java.lang.IllegalArgumentException: You cannot call toBytes() more than once
without calling reset()1/22/2020 8:37:42 AM at
org.apache.parquet.Preconditions.checkArgument(Preconditions.java:53)1/22/2020
8:37:42 AM at
org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder.toBytes(RunLengthBitPackingHybridEncoder.java:254)1/22/2020
8:37:42 AM at
org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter.getBytes(RunLengthBitPackingHybridValuesWriter.java:65)1/22/2020
8:37:42 AM at
org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:148)1/22/2020
8:37:42 AM at
org.apache.parquet.column.impl.ColumnWriterV1.accountForValueWritten(ColumnWriterV1.java:106)1/22/2020
8:37:42 AM at
org.apache.parquet.column.impl.ColumnWriterV1.write(ColumnWriterV1.java:200)1/22/2020
8:37:42 AM at
org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.addBinary(MessageColumnIO.java:469)1/22/2020
8:37:42 AM at
org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:346)1/22/2020
8:37:42 AM at
org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:278)1/22/2020
8:37:42 AM at
org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191)1/22/2020
8:37:42 AM at
org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)1/22/2020
8:37:42 AM at
org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)1/22/2020
8:37:42 AM at
org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)1/22/2020
8:37:42 AM at
com.test.es.writers.ParquetS3Writer.write(ParquetS3Writer.java:250)1/22/2020
8:37:42 AM at
com.test.es.handler.ApplogConsumerMessageHandler.processMessage(ApplogConsumerMessageHandler.java:60)1/22/2020
8:37:42 AM at
com.test.des.kafka.StreamConsumer.consumeMessage(StreamConsumer.java:160)1/22/2020
8:37:42 AM at
com.test.des.kafka.StreamConsumer.run(StreamConsumer.java:97)1/22/2020 8:37:42
AM at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)1/22/2020
8:37:42 AM at
java.util.concurrent.FutureTask.run(FutureTask.java:266)1/22/2020 8:37:42 AM at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)1/22/2020
8:37:42 AM at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)1/22/2020
8:37:42 AM at java.lang.Thread.run(Thread.java:748)
{code}
We've created a custom class (ParquetS3Writer) that initializes an instance of
AvroParquetWriter called parquetWriter. Our Kafka consumer creates an
instantiation of ParquetS3Writer. ParquetS3Writer.write calls
parquetWriter.write and passes a Generic Data Record, which ultimately calls
ParquetWriter.write with this record. Every time we call ParquetWriter.write,
we increment a counter to keep track of the number of payloads we've written.
We have a set buffer size. When the counter reaches that buffer size, we call
ParquetWriter.close, then initialize a new AvroParquetWriter and repeat the
process.
The exception starts throwing after running for a while without any issues. It
exceeds the buffer size and repeats the cycle above many times without failing.
It might be worth mentioning that we are processing 5-10k messages per minutes
using this message.
> Parquet file in invalid state while writing to S3 when calling
> ParquetWriter.write
> ----------------------------------------------------------------------------------
>
> Key: PARQUET-1773
> URL: https://issues.apache.org/jira/browse/PARQUET-1773
> Project: Parquet
> Issue Type: Bug
> Components: parquet-mr
> Affects Versions: 1.10.0
> Reporter: Tristan Davolt
> Priority: Major
>
> This may be related to PARQUET-632. I am also writing parquet to S3, but I am
> calling ParquetWriter.write directly. I have multiple containerized instances
> consuming messages from Kafka, converting them to Parquet, and then writing
> to S3. One instance will begin to throw this exception for all new messages.
> Sometimes, the container will recover. Other times, it must be restarted
> manually to recover. I am unable to find any "error thrown previously."
> Exception:
> java.io.IOException
> Message:
> The file being written is in an invalid state. Probably caused by an error
> thrown previously. Current state: BLOCK
> Stacktrace:
> {code:java}
> org.apache.parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:168)org.apache.parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:160)org.apache.parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:291)org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:171)org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:308)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)