[ https://issues.apache.org/jira/browse/KAFKA-9880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
A. Sophie Blee-Goldman resolved KAFKA-9880. ------------------------------------------- Fix Version/s: 2.6.0 Resolution: Fixed > Error while range compacting during bulk loading of FIFO compacted RocksDB > Store > -------------------------------------------------------------------------------- > > Key: KAFKA-9880 > URL: https://issues.apache.org/jira/browse/KAFKA-9880 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.4.1 > Reporter: Nicolas Carlot > Priority: Major > Fix For: 2.6.0 > > > > When restoring a non empty RocksDB state store, if it is customized to use > FIFOCompaction, the following exception is thrown: > > {code:java} > // > org.apache.kafka.streams.errors.ProcessorStateException: Error while range > compacting during restoring store merge_store > at > org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) > [kafka-stream-router.jar:?] > Caused by: org.rocksdb.RocksDBException: Target level exceeds number of levels > at org.rocksdb.RocksDB.compactRange(Native Method) > ~[kafka-stream-router.jar:?] > at org.rocksdb.RocksDB.compactRange(RocksDB.java:2636) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:613) > ~[kafka-stream-router.jar:?] > ... 11 more > {code} > > > Compaction is configured through an implementation of RocksDBConfigSetter. > The exception si gone as soon as I remove: > {code:java} > // > CompactionOptionsFIFO fifoOptions = new CompactionOptionsFIFO(); > fifoOptions.setMaxTableFilesSize(maxSize); > fifoOptions.setAllowCompaction(true); > options.setCompactionOptionsFIFO(fifoOptions); > options.setCompactionStyle(CompactionStyle.FIFO); > {code} > > > Bulk loading works fine when the store is non-existent / empty. This occurs > only when there are a minimum amount of data in it. I guess it happens when > the amount SST layers is increased. > I'm currently using a forked version of Kafka 2.4.1 customizing the > RocksDBStore class with this modification as a work around: > > {code:java} > // > public void toggleDbForBulkLoading() { > try { > db.compactRange(columnFamily, true, 1, 0); > } catch (final RocksDBException e) { > try { > if (columnFamily.getDescriptor().getOptions().compactionStyle() != > CompactionStyle.FIFO) { > throw new ProcessorStateException("Error while range compacting > during restoring store " + name, e); > } > else { > log.warn("Compaction of store " + name + " for bulk loading > failed. Will continue without compacted store, which will be slower.", e); > } > } catch (RocksDBException e1) { > throw new ProcessorStateException("Error while range compacting > during restoring store " + name, e); > } > } > } > {code} > > I'm not very proud of this workaround, but it suits my use cases well. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)