[
https://issues.apache.org/jira/browse/HUDI-1205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17218427#comment-17218427
]
Lee Huynh commented on HUDI-1205:
---------------------------------
ASSUMPTION: The following comment assumes ValueMetaData.sizeOfValue overflowing
is the actual cause of the issue.
The [Github issue #1|https://github.com/apache/hudi/issues/1890] points to
[ValueMetaData.sizeOfValue|https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java#L353]
as the field that is overflowing. ValueMetaData.sizeOfValue is only set in
the constructor of ValueMetaData. The constructor is called only in
[DiskBasedMap.put()
method|https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java#L229],
copied below (comment about valueSize is mine):
{code:java}
private synchronized R put(T key, R value, boolean flush) {
try {
byte[] val = SerializationUtils.serialize(value);
Integer valueSize = val.length;
Long timestamp = System.currentTimeMillis();
this.valueMetadataMap.put(key,
new DiskBasedMap.ValueMetadata(this.filePath, valueSize,
filePosition.get(), timestamp)); // ValueMetaData.sizeOfValue is set to
valueSize
byte[] serializedKey = SerializationUtils.serialize(key);
filePosition
.set(SpillableMapUtils.spillToDisk(writeOnlyFileHandle, new
FileEntry(SpillableMapUtils.generateChecksum(val),
serializedKey.length, valueSize, serializedKey, val, timestamp)));
if (flush) {
flushToDisk();
}
} catch (IOException io) {
throw new HoodieIOException("Unable to store data in Disk Based map", io);
}
return value;
}
{code}
We see that valueSize is the length of the byte array that is serialized.
Cursory web searches show that the maximum array length in Java is 2^31
elements. Therefore, simply changing ValueMetaData.sizeOfValue to Long will
not fix this issue. Also, changing ValueMetaData.sizeOfValue to unsigned won't
work either.
Furthermore, the
[FileEntry.readInternal()|https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java#L52]
method receives the value of ValueMetaData.sizeOfValue for its valueLength
parameter and writes out 4 bytes. If we change ValueMetaData.sizeOfValue to a
Long, then it cannot be written out to the file without breaking backwards
compatibility. Changing it to unsigned will work in this particular point, but
the previous point about the max array length still stands.
Somehow, the value serialized must be limited to 2 GB, or the way
SerializationUtils.serialize() returns its contents must be changed.
> Serialization fail when log file is larger than 2GB
> ---------------------------------------------------
>
> Key: HUDI-1205
> URL: https://issues.apache.org/jira/browse/HUDI-1205
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: Yanjia Gary Li
> Priority: Major
>
> When scanning the log file, if the log file(or log file group) is larger than
> 2GB, serialization will fail because Hudi uses Integer to store size in byte
> for the log file. The maximum integer representing bytes is 2GB.
> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class:
> org.apache.hudi.common.model.OverwriteWithLatestAvroPayload$$Lambda$45/62103784
> Serialization trace:
> orderingVal (org.apache.hudi.common.model.OverwriteWithLatestAvroPayload)
> data (org.apache.hudi.common.model.HoodieRecord)
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:160)
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:133)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:693)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:118)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
> at
> org.apache.hudi.common.util.SerializationUtils$KryoSerializerInstance.deserialize(SerializationUtils.java:107)
> at
> org.apache.hudi.common.util.SerializationUtils.deserialize(SerializationUtils.java:81)
> at
> org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedMap.java:217)
> at
> org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedMap.java:211)
> at
> org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedMap.java:207)
> at
> org.apache.hudi.common.util.collection.ExternalSpillableMap.get(ExternalSpillableMap.java:168)
> at
> org.apache.hudi.common.util.collection.ExternalSpillableMap.get(ExternalSpillableMap.java:55)
> at
> org.apache.hudi.HoodieMergeOnReadRDD$$anon$1.hasNext(HoodieMergeOnReadRDD.scala:128)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:624)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
> at org.apache.spark.scheduler.Task.run(Task.scala:121)
> at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.hudi.common.model.OverwriteWithLatestAvroPayload$$Lambda$45/62103784
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:154)
> ... 31 more
--
This message was sent by Atlassian Jira
(v8.3.4#803005)