[ 
https://issues.apache.org/jira/browse/FLINK-16686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17761031#comment-17761031
 ] 

Alexis Sarda-Espinosa commented on FLINK-16686:
-----------------------------------------------

cc [~yunta]

I wanted to check if this problem occurs with Avro, so I did a similar 
experiment as the one I did before with Kryo. I only considered 
{{GenericRecord}} serialization for now, but I can confirm this problem also 
occurs in that case.

I basically implemented a custom serializer, similar to what's described [in 
this 
example|https://medium.com/wbaa/evolve-your-data-model-in-flinks-state-using-avro-f26982afa399],
 and after running a dummy job for 2 hours I got this exception:

{noformat}
AvroEmbeddedRocksDBTest > avroSerialization() STANDARD_ERROR
    java.lang.NullPointerException
        at org.apache.avro.Schema.applyAliases(Schema.java:1872)
        at 
org.apache.avro.generic.GenericDatumReader.getResolver(GenericDatumReader.java:131)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
        at 
com.avro.flink.GenericRecordTypeSerializer.deserialize(GenericRecordTypeSerializer.kt:56)
        at 
com.avro.flink.GenericRecordTypeSerializer.deserialize(GenericRecordTypeSerializer.kt:16)
        at 
org.apache.flink.api.common.typeutils.CompositeSerializer.deserialize(CompositeSerializer.java:156)
        at 
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextElementLastAccessTimestamp(RocksDbTtlCompactFiltersManager.java:205)
        at 
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextUnexpiredOffset(RocksDbTtlCompactFiltersManager.java:191)
{noformat}

Note that this stacktrace does _not_ mention a classloader. Some more details 
from the dump in case they are interesting:

{noformat}
Current thread (0x00007f7ae80b0800):  JavaThread "Thread-9949" [_thread_in_vm, 
id=9258, stack(0x00007f7a59a01000,0x00007f7a5a200000)]

Stack: [0x00007f7a59a01000,0x00007f7a5a200000],  sp=0x00007f7a5a1fcdc0,  free 
space=8175k
Native frames: (J=compiled Java code, A=aot compiled Java code, j=interpreted, 
Vv=VM code, C=native code)
V  [libjvm.so+0x7b8f75]
V  [libjvm.so+0x980aab]
C  [librocksdbjni-linux64.so+0x222ce1]  
JavaListElementFilter::NextUnexpiredOffset(rocksdb::Slice const&, long, long) 
const+0x121
C  [librocksdbjni-linux64.so+0x648941]  
rocksdb::flink::FlinkCompactionFilter::ListDecide(rocksdb::Slice const&, 
std::string*) const+0x81
C  [librocksdbjni-linux64.so+0x648de7]  
rocksdb::flink::FlinkCompactionFilter::FilterV2(int, rocksdb::Slice const&, 
rocksdb::CompactionFilter::ValueType, rocksdb::Slice const&, std::string*, 
std::string*) const+0xc7
C  [librocksdbjni-linux64.so+0x3cce72]  
rocksdb::MergeHelper::FilterMerge(rocksdb::Slice const&, rocksdb::Slice 
const&)+0x172
C  [librocksdbjni-linux64.so+0x3cdf06]  
rocksdb::MergeHelper::MergeUntil(rocksdb::InternalIteratorBase<rocksdb::Slice>*,
 rocksdb::CompactionRangeDelAggregator*, unsigned long, bool, bool)+0xcc6
C  [librocksdbjni-linux64.so+0x2d2a81]  
rocksdb::CompactionIterator::NextFromInput()+0x9a1
C  [librocksdbjni-linux64.so+0x2d4911]  
rocksdb::CompactionIterator::SeekToFirst()+0x11
C  [librocksdbjni-linux64.so+0x2dd2be]  
rocksdb::CompactionJob::ProcessKeyValueCompaction(rocksdb::CompactionJob::SubcompactionState*)+0x55e
C  [librocksdbjni-linux64.so+0x2de668]  rocksdb::CompactionJob::Run()+0x278
C  [librocksdbjni-linux64.so+0x33cec4]  
rocksdb::DBImpl::BackgroundCompaction(bool*, rocksdb::JobContext*, 
rocksdb::LogBuffer*, rocksdb::DBImpl::PrepickedCompaction*, 
rocksdb::Env::Priority)+0x1084
C  [librocksdbjni-linux64.so+0x3407e7]  
rocksdb::DBImpl::BackgroundCallCompaction(rocksdb::DBImpl::PrepickedCompaction*,
 rocksdb::Env::Priority)+0xd7
C  [librocksdbjni-linux64.so+0x340dda]  
rocksdb::DBImpl::BGWorkCompaction(void*)+0x3a
C  [librocksdbjni-linux64.so+0x5e5744]  
rocksdb::ThreadPoolImpl::Impl::BGThread(unsigned long)+0x254
C  [librocksdbjni-linux64.so+0x5e58ed]  
rocksdb::ThreadPoolImpl::Impl::BGThreadWrapper(void*)+0x5d
{noformat}

> [State TTL] Make user class loader available in native RocksDB compaction 
> thread
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-16686
>                 URL: https://issues.apache.org/jira/browse/FLINK-16686
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.8.0, 1.11.3, 1.13.0, 1.12.3
>            Reporter: Andrey Zagrebin
>            Priority: Not a Priority
>              Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> The issue is initially reported 
> [here|https://stackoverflow.com/questions/60745711/flink-kryo-serializer-because-chill-serializer-couldnt-be-found].
> The problem is that the java code of Flink compaction filter is called from 
> RocksDB native C++ code. It is called in the context of the native compaction 
> thread. RocksDB has utilities to create java Thread context for the Flink 
> java callback. Presumably, the Java thread context class loader is not set at 
> all and if it is queried then it produces NullPointerException.
> The provided report enabled a list state with TTL. The compaction filter has 
> to deserialise elements to check expiration. The deserialiser relies on Kryo 
> which queries the thread context class loader which is expected to be the 
> user class loader of the task but turns out to be null.
> We should investigate how to pass the user class loader to the compaction 
> thread of the list state with TTL.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to