[ https://issues.apache.org/jira/browse/FLINK-9572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510594#comment-16510594 ]
ASF GitHub Bot commented on FLINK-9572: --------------------------------------- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6156#discussion_r194951927 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java --- @@ -87,55 +87,14 @@ public RocksDBAggregatingState( @Override public R get() throws IOException { - try { - // prepare the current key and namespace for RocksDB lookup - writeCurrentKeyWithGroupAndNamespace(); - final byte[] key = keySerializationStream.toByteArray(); - - // get the current value - final byte[] valueBytes = backend.db.get(columnFamily, key); - - if (valueBytes == null) { - return null; - } - - ACC accumulator = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes))); - return aggFunction.getResult(accumulator); - } - catch (IOException | RocksDBException e) { - throw new IOException("Error while retrieving value from RocksDB", e); - } + return aggFunction.getResult(getInternal()); } @Override public void add(T value) throws IOException { - try { - // prepare the current key and namespace for RocksDB lookup - writeCurrentKeyWithGroupAndNamespace(); - final byte[] key = keySerializationStream.toByteArray(); - keySerializationStream.reset(); - - // get the current value - final byte[] valueBytes = backend.db.get(columnFamily, key); - - // deserialize the current accumulator, or create a blank one - ACC accumulator = valueBytes == null ? - aggFunction.createAccumulator() : - valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes))); - - // aggregate the value into the accumulator - accumulator = aggFunction.add(value, accumulator); - - // serialize the new accumulator - final DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); - valueSerializer.serialize(accumulator, out); - - // write the new value to RocksDB - backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray()); - } - catch (IOException | RocksDBException e) { - throw new IOException("Error while adding value to RocksDB", e); - } + ACC accumulator = getInternal(); + accumulator = accumulator == null ? aggFunction.createAccumulator() : accumulator; + updateInternal(aggFunction.add(value, accumulator)); --- End diff -- We have to serialize the key bytes twice currently, the previous version only need to serialize the key bytes once. > Extend InternalAppendingState with internal stored state access > --------------------------------------------------------------- > > Key: FLINK-9572 > URL: https://issues.apache.org/jira/browse/FLINK-9572 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing > Affects Versions: 1.6.0 > Reporter: Andrey Zagrebin > Assignee: Andrey Zagrebin > Priority: Major > Fix For: 1.6.0 > > > > {code:java} > public interface InternalAppendingState<K, N, IN, SV, OUT> ... { > SV getInternal(); > void updateInternal(SV); > } > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)