tzulitai commented on a change in pull request #7590: [FLINK-11329][core]
Migrating CompositeSerializers
URL: https://github.com/apache/flink/pull/7590#discussion_r252128586
##########
File path:
flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
##########
@@ -92,7 +93,58 @@ protected Object getField(@Nonnull ValueWithTs<?> value,
int index) {
protected CompositeSerializer<ValueWithTs<?>>
createSerializerInstance(
PrecomputedParameters precomputed,
TypeSerializer<?>... originalSerializers) {
- return new Serializer(precomputed,
(TypeSerializer<Object>) originalSerializers[0]);
+
+ return new Serializer(precomputed,
originalSerializers[0], originalSerializers[1]);
+ }
+
+ TypeSerializer<?> getValueSerializer() {
+ return fieldSerializers[0];
+ }
+
+ @SuppressWarnings("unchecked")
+ TypeSerializer<Long> getTimestampSerializer() {
+ TypeSerializer<?> fieldSerializer = fieldSerializers[1];
+ return (TypeSerializer<Long>) fieldSerializer;
+ }
+
+ @Override
+ public TypeSerializerSnapshot<ValueWithTs<?>>
snapshotConfiguration() {
+ return new ValueWithTsSerializerSnapshot(this);
+ }
+ }
+
+ /**
+ * A {@link TypeSerializerSnapshot} for ValueWithTs Serializer.
+ */
+ public static final class ValueWithTsSerializerSnapshot extends
CompositeTypeSerializerSnapshot<ValueWithTs<?>, Serializer> {
+
+ private final static int VERSION = 2;
+
+ @SuppressWarnings("unused")
+ public ValueWithTsSerializerSnapshot() {
+ super(Serializer.class);
+ }
+
+ ValueWithTsSerializerSnapshot(Serializer serializerInstance) {
+ super(serializerInstance);
+ }
+
+ @Override
+ protected int getCurrentOuterSnapshotVersion() {
+ return VERSION;
+ }
+
+ @Override
+ protected TypeSerializer<?>[] getNestedSerializers(Serializer
outerSerializer) {
+ return new
TypeSerializer[]{outerSerializer.getValueSerializer(),
outerSerializer.getTimestampSerializer()};
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected Serializer
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[]
nestedSerializers) {
+ TypeSerializer<?> valueSerializer =
nestedSerializers[0];
+ TypeSerializer<Long> timeSerializer =
(TypeSerializer<Long>) nestedSerializers[1];
Review comment:
nit: `time` --> `timestamp` for naming consistency
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services