[
https://issues.apache.org/jira/browse/FLINK-16686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17761031#comment-17761031
]
Alexis Sarda-Espinosa commented on FLINK-16686:
-----------------------------------------------
cc [~yunta]
I wanted to check if this problem occurs with Avro, so I did a similar
experiment as the one I did before with Kryo. I only considered
{{GenericRecord}} serialization for now, but I can confirm this problem also
occurs in that case.
I basically implemented a custom serializer, similar to what's described [in
this
example|https://medium.com/wbaa/evolve-your-data-model-in-flinks-state-using-avro-f26982afa399],
and after running a dummy job for 2 hours I got this exception:
{noformat}
AvroEmbeddedRocksDBTest > avroSerialization() STANDARD_ERROR
java.lang.NullPointerException
at org.apache.avro.Schema.applyAliases(Schema.java:1872)
at
org.apache.avro.generic.GenericDatumReader.getResolver(GenericDatumReader.java:131)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
at
com.avro.flink.GenericRecordTypeSerializer.deserialize(GenericRecordTypeSerializer.kt:56)
at
com.avro.flink.GenericRecordTypeSerializer.deserialize(GenericRecordTypeSerializer.kt:16)
at
org.apache.flink.api.common.typeutils.CompositeSerializer.deserialize(CompositeSerializer.java:156)
at
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextElementLastAccessTimestamp(RocksDbTtlCompactFiltersManager.java:205)
at
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextUnexpiredOffset(RocksDbTtlCompactFiltersManager.java:191)
{noformat}
Note that this stacktrace does _not_ mention a classloader. Some more details
from the dump in case they are interesting:
{noformat}
Current thread (0x00007f7ae80b0800): JavaThread "Thread-9949" [_thread_in_vm,
id=9258, stack(0x00007f7a59a01000,0x00007f7a5a200000)]
Stack: [0x00007f7a59a01000,0x00007f7a5a200000], sp=0x00007f7a5a1fcdc0, free
space=8175k
Native frames: (J=compiled Java code, A=aot compiled Java code, j=interpreted,
Vv=VM code, C=native code)
V [libjvm.so+0x7b8f75]
V [libjvm.so+0x980aab]
C [librocksdbjni-linux64.so+0x222ce1]
JavaListElementFilter::NextUnexpiredOffset(rocksdb::Slice const&, long, long)
const+0x121
C [librocksdbjni-linux64.so+0x648941]
rocksdb::flink::FlinkCompactionFilter::ListDecide(rocksdb::Slice const&,
std::string*) const+0x81
C [librocksdbjni-linux64.so+0x648de7]
rocksdb::flink::FlinkCompactionFilter::FilterV2(int, rocksdb::Slice const&,
rocksdb::CompactionFilter::ValueType, rocksdb::Slice const&, std::string*,
std::string*) const+0xc7
C [librocksdbjni-linux64.so+0x3cce72]
rocksdb::MergeHelper::FilterMerge(rocksdb::Slice const&, rocksdb::Slice
const&)+0x172
C [librocksdbjni-linux64.so+0x3cdf06]
rocksdb::MergeHelper::MergeUntil(rocksdb::InternalIteratorBase<rocksdb::Slice>*,
rocksdb::CompactionRangeDelAggregator*, unsigned long, bool, bool)+0xcc6
C [librocksdbjni-linux64.so+0x2d2a81]
rocksdb::CompactionIterator::NextFromInput()+0x9a1
C [librocksdbjni-linux64.so+0x2d4911]
rocksdb::CompactionIterator::SeekToFirst()+0x11
C [librocksdbjni-linux64.so+0x2dd2be]
rocksdb::CompactionJob::ProcessKeyValueCompaction(rocksdb::CompactionJob::SubcompactionState*)+0x55e
C [librocksdbjni-linux64.so+0x2de668] rocksdb::CompactionJob::Run()+0x278
C [librocksdbjni-linux64.so+0x33cec4]
rocksdb::DBImpl::BackgroundCompaction(bool*, rocksdb::JobContext*,
rocksdb::LogBuffer*, rocksdb::DBImpl::PrepickedCompaction*,
rocksdb::Env::Priority)+0x1084
C [librocksdbjni-linux64.so+0x3407e7]
rocksdb::DBImpl::BackgroundCallCompaction(rocksdb::DBImpl::PrepickedCompaction*,
rocksdb::Env::Priority)+0xd7
C [librocksdbjni-linux64.so+0x340dda]
rocksdb::DBImpl::BGWorkCompaction(void*)+0x3a
C [librocksdbjni-linux64.so+0x5e5744]
rocksdb::ThreadPoolImpl::Impl::BGThread(unsigned long)+0x254
C [librocksdbjni-linux64.so+0x5e58ed]
rocksdb::ThreadPoolImpl::Impl::BGThreadWrapper(void*)+0x5d
{noformat}
> [State TTL] Make user class loader available in native RocksDB compaction
> thread
> --------------------------------------------------------------------------------
>
> Key: FLINK-16686
> URL: https://issues.apache.org/jira/browse/FLINK-16686
> Project: Flink
> Issue Type: Bug
> Components: Runtime / State Backends
> Affects Versions: 1.8.0, 1.11.3, 1.13.0, 1.12.3
> Reporter: Andrey Zagrebin
> Priority: Not a Priority
> Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> The issue is initially reported
> [here|https://stackoverflow.com/questions/60745711/flink-kryo-serializer-because-chill-serializer-couldnt-be-found].
> The problem is that the java code of Flink compaction filter is called from
> RocksDB native C++ code. It is called in the context of the native compaction
> thread. RocksDB has utilities to create java Thread context for the Flink
> java callback. Presumably, the Java thread context class loader is not set at
> all and if it is queried then it produces NullPointerException.
> The provided report enabled a list state with TTL. The compaction filter has
> to deserialise elements to check expiration. The deserialiser relies on Kryo
> which queries the thread context class loader which is expected to be the
> user class loader of the task but turns out to be null.
> We should investigate how to pass the user class loader to the compaction
> thread of the list state with TTL.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)