asfgit closed pull request #7434: [FLINK-11287] [rocksdb] RocksDBListState
should be using registered serializer in state meta infos
URL: https://github.com/apache/flink/pull/7434
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
index 8d85e74aca7..b636ed08bd2 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
@@ -40,7 +40,6 @@
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -192,7 +191,6 @@ public void testKeyedListStateMigration() throws Exception {
}
@Test
- @Ignore("This currently doesn't pass because the ListSerializer doesn't
respect the reconfigured case, yet.")
public void testKeyedListStateSerializerReconfiguration() throws
Exception {
final String stateName = "test-name";
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index 13f5559405e..72a5bc60b20 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -19,7 +19,6 @@
package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -77,7 +76,6 @@
* @param namespaceSerializer The serializer for the namespace.
* @param valueSerializer The serializer for the state.
* @param defaultValue The default value for the state.
- * @param elementSerializer The serializer for elements of the list
state.
* @param backend The backend for which this state is bind to.
*/
private RocksDBListState(
@@ -85,11 +83,12 @@ private RocksDBListState(
TypeSerializer<N> namespaceSerializer,
TypeSerializer<List<V>> valueSerializer,
List<V> defaultValue,
- TypeSerializer<V> elementSerializer,
RocksDBKeyedStateBackend<K> backend) {
super(columnFamily, namespaceSerializer, valueSerializer,
defaultValue, backend);
- this.elementSerializer = elementSerializer;
+
+ ListSerializer<V> castedListSerializer = (ListSerializer<V>)
valueSerializer;
+ this.elementSerializer =
castedListSerializer.getElementSerializer();
}
@Override
@@ -281,7 +280,6 @@ public void migrateSerializedValue(
registerResult.f1.getNamespaceSerializer(),
(TypeSerializer<List<E>>)
registerResult.f1.getStateSerializer(),
(List<E>) stateDesc.getDefaultValue(),
- ((ListStateDescriptor<E>)
stateDesc).getElementSerializer(),
backend);
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services