This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ae7861dd0a124213dd89b434175093dc64ee1904
Author: Andrey Zagrebin <[email protected]>
AuthorDate: Tue Feb 26 19:43:21 2019 +0100

    [FLINK-9003][E2E] Add operators with input going through custom, stateful 
serialization.
---
 .../SingleThreadAccessCheckingTypeSerializer.java  | 203 +++++++++++++++++++++
 .../flink/api/common/typeutils/TypeSerializer.java |   5 +-
 .../tests/DataStreamAllroundTestJobFactory.java    |  60 ++++++
 .../tests/DataStreamAllroundTestProgram.java       |  71 ++++---
 .../tests/SingleThreadAccessCheckingTypeInfo.java  | 101 ++++++++++
 .../flink/streaming/tests/TestOperatorEnum.java    |  53 ++++++
 .../state/StateSnapshotTransformerTest.java        | 108 +----------
 7 files changed, 474 insertions(+), 127 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/SingleThreadAccessCheckingTypeSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/SingleThreadAccessCheckingTypeSerializer.java
new file mode 100644
index 0000000..d2ffb97
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/SingleThreadAccessCheckingTypeSerializer.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicReference;
+
+@Internal
+public class SingleThreadAccessCheckingTypeSerializer<T> extends 
TypeSerializer<T> {
+       private static final long serialVersionUID = 131020282727167064L;
+
+       private final SingleThreadAccessChecker singleThreadAccessChecker;
+       private final TypeSerializer<T> originalSerializer;
+
+       public SingleThreadAccessCheckingTypeSerializer(TypeSerializer<T> 
originalSerializer) {
+               this.singleThreadAccessChecker = new 
SingleThreadAccessChecker();
+               this.originalSerializer = originalSerializer;
+       }
+
+       @Override
+       public boolean isImmutableType() {
+               try (SingleThreadAccessCheck ignored = 
singleThreadAccessChecker.startSingleThreadAccessCheck()) {
+                       return originalSerializer.isImmutableType();
+               }
+       }
+
+       @Override
+       public TypeSerializer<T> duplicate() {
+               try (SingleThreadAccessCheck ignored = 
singleThreadAccessChecker.startSingleThreadAccessCheck()) {
+                       return new 
SingleThreadAccessCheckingTypeSerializer<>(originalSerializer.duplicate());
+               }
+       }
+
+       @Override
+       public T createInstance() {
+               try (SingleThreadAccessCheck ignored = 
singleThreadAccessChecker.startSingleThreadAccessCheck()) {
+                       return originalSerializer.createInstance();
+               }
+       }
+
+       @Override
+       public T copy(T from) {
+               try (SingleThreadAccessCheck ignored = 
singleThreadAccessChecker.startSingleThreadAccessCheck()) {
+                       return originalSerializer.copy(from);
+               }
+       }
+
+       @Override
+       public T copy(T from, T reuse) {
+               try (SingleThreadAccessCheck ignored = 
singleThreadAccessChecker.startSingleThreadAccessCheck()) {
+                       return originalSerializer.copy(from, reuse);
+               }
+       }
+
+       @Override
+       public int getLength() {
+               try (SingleThreadAccessCheck ignored = 
singleThreadAccessChecker.startSingleThreadAccessCheck()) {
+                       return originalSerializer.getLength();
+               }
+       }
+
+       @Override
+       public void serialize(T record, DataOutputView target) throws 
IOException {
+               try (SingleThreadAccessCheck ignored = 
singleThreadAccessChecker.startSingleThreadAccessCheck()) {
+                       originalSerializer.serialize(record, target);
+               }
+       }
+
+       @Override
+       public T deserialize(DataInputView source) throws IOException {
+               try (SingleThreadAccessCheck ignored = 
singleThreadAccessChecker.startSingleThreadAccessCheck()) {
+                       return originalSerializer.deserialize(source);
+               }
+       }
+
+       @Override
+       public T deserialize(T reuse, DataInputView source) throws IOException {
+               try (SingleThreadAccessCheck ignored = 
singleThreadAccessChecker.startSingleThreadAccessCheck()) {
+                       return originalSerializer.deserialize(reuse, source);
+               }
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               try (SingleThreadAccessCheck ignored = 
singleThreadAccessChecker.startSingleThreadAccessCheck()) {
+                       originalSerializer.copy(source, target);
+               }
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               try (SingleThreadAccessCheck ignored = 
singleThreadAccessChecker.startSingleThreadAccessCheck()) {
+                       return obj == this ||
+                               (obj != null && obj.getClass() == getClass() &&
+                                       originalSerializer.equals(obj));
+               }
+       }
+
+       @Override
+       public int hashCode() {
+               try (SingleThreadAccessCheck ignored = 
singleThreadAccessChecker.startSingleThreadAccessCheck()) {
+                       return originalSerializer.hashCode();
+               }
+       }
+
+       @Override
+       public TypeSerializerSnapshot<T> snapshotConfiguration() {
+               try (SingleThreadAccessCheck ignored = 
singleThreadAccessChecker.startSingleThreadAccessCheck()) {
+                       return new 
SingleThreadAccessCheckingTypeSerializerSnapshot<>(this);
+               }
+       }
+
+       public static class SingleThreadAccessCheckingTypeSerializerSnapshot<T>
+               extends CompositeTypeSerializerSnapshot<T, 
SingleThreadAccessCheckingTypeSerializer<T>> {
+
+               @SuppressWarnings({"unchecked", "unused"})
+               public SingleThreadAccessCheckingTypeSerializerSnapshot() {
+                       
super((Class<SingleThreadAccessCheckingTypeSerializer<T>>) (Class<?>) 
SingleThreadAccessCheckingTypeSerializer.class);
+               }
+
+               
SingleThreadAccessCheckingTypeSerializerSnapshot(SingleThreadAccessCheckingTypeSerializer<T>
 serializerInstance) {
+                       super(serializerInstance);
+               }
+
+               @Override
+               protected int getCurrentOuterSnapshotVersion() {
+                       return 1;
+               }
+
+               @Override
+               protected TypeSerializer<?>[] 
getNestedSerializers(SingleThreadAccessCheckingTypeSerializer<T> 
outerSerializer) {
+                       return new TypeSerializer[] { 
outerSerializer.originalSerializer };
+               }
+
+               @SuppressWarnings("unchecked")
+               @Override
+               protected SingleThreadAccessCheckingTypeSerializer<T> 
createOuterSerializerWithNestedSerializers(
+                       TypeSerializer<?>[] nestedSerializers) {
+
+                       return new 
SingleThreadAccessCheckingTypeSerializer<>((TypeSerializer<T>) 
nestedSerializers[0]);
+               }
+       }
+
+       private void writeObject(ObjectOutputStream outputStream) throws 
IOException {
+               try (SingleThreadAccessCheck ignored = 
singleThreadAccessChecker.startSingleThreadAccessCheck()) {
+                       outputStream.defaultWriteObject();
+               }
+       }
+
+       private static class SingleThreadAccessChecker implements Serializable {
+               private static final long serialVersionUID = 
131020282727167064L;
+
+               private transient AtomicReference<Thread> currentThreadRef = 
new AtomicReference<>();
+
+               SingleThreadAccessCheck startSingleThreadAccessCheck() {
+                       assert(currentThreadRef.compareAndSet(null, 
Thread.currentThread())) :
+                               "The checker has concurrent access from " + 
currentThreadRef.get();
+                       return new SingleThreadAccessCheck(currentThreadRef);
+               }
+
+               private void readObject(ObjectInputStream inputStream) throws 
ClassNotFoundException, IOException {
+                       inputStream.defaultReadObject();
+                       currentThreadRef = new AtomicReference<>();
+               }
+       }
+
+       private static class SingleThreadAccessCheck implements AutoCloseable {
+               private final AtomicReference<Thread> currentThreadRef;
+
+               private SingleThreadAccessCheck(AtomicReference<Thread> 
currentThreadRef) {
+                       this.currentThreadRef = currentThreadRef;
+               }
+
+               @Override
+               public void close() {
+                       
assert(currentThreadRef.compareAndSet(Thread.currentThread(), null)) :
+                               "The checker has concurrent access from " + 
currentThreadRef.get();
+               }
+       }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index f0036c4..74d54b9 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -29,9 +29,8 @@ import java.io.Serializable;
  * This interface describes the methods that are required for a data type to 
be handled by the Flink
  * runtime. Specifically, this interface contains the serialization and 
copying methods.
  *
- * <p>The methods in this class are assumed to be stateless, such that it is 
effectively thread safe. Stateful
- * implementations of the methods may lead to unpredictable side effects and 
will compromise both stability and
- * correctness of the program.
+ * <p>The methods in this class are not necessarily thread safe. To avoid 
unpredictable side effects,
+ * it is recommended to call {@code duplicate()} method and use one serializer 
instance per thread.
  *
  * <p><b>Upgrading TypeSerializers to the new TypeSerializerSnapshot model</b>
  *
diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
index 06325ad..af39522 100644
--- 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
@@ -24,8 +24,11 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
@@ -34,6 +37,7 @@ import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.WindowedStream;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
@@ -52,6 +56,10 @@ import 
org.apache.flink.streaming.tests.artificialstate.builder.ArtificialValueS
 import java.util.ArrayList;
 import java.util.List;
 
+import static 
org.apache.flink.streaming.tests.TestOperatorEnum.EVENT_IDENTITY_MAPPER;
+import static 
org.apache.flink.streaming.tests.TestOperatorEnum.MAPPER_RETURNS_OUT_WITH_CUSTOM_SER;
+import static 
org.apache.flink.streaming.tests.TestOperatorEnum.RESULT_TYPE_QUERYABLE_MAPPER_WITH_CUSTOM_SER;
+
 /**
  * A factory for components of general purpose test jobs for Flink's 
DataStream API operators and primitives.
  *
@@ -490,4 +498,56 @@ public class DataStreamAllroundTestJobFactory {
                        TEST_SLIDE_FACTOR.defaultValue()
                ));
        }
+
+       static DataStream<Event> 
verifyCustomStatefulTypeSerializer(DataStream<Event> eventStream) {
+               return eventStream
+                       .map(new 
EventIdentityFunctionWithCustomEventTypeInformation())
+                       
.name(RESULT_TYPE_QUERYABLE_MAPPER_WITH_CUSTOM_SER.getName())
+                       
.uid(RESULT_TYPE_QUERYABLE_MAPPER_WITH_CUSTOM_SER.getUid())
+                       // apply a keyBy so that we have a non-chained operator 
with Event as input type that goes through serialization
+                       .keyBy(new 
EventKeySelectorWithCustomKeyTypeInformation())
+
+                       .map(e -> e)
+                       .returns(new 
SingleThreadAccessCheckingTypeInfo<>(Event.class))
+                       .name(MAPPER_RETURNS_OUT_WITH_CUSTOM_SER.getName())
+                       .uid(MAPPER_RETURNS_OUT_WITH_CUSTOM_SER.getUid())
+                       // apply a keyBy so that we have a non-chained operator 
with Event as input type that goes through serialization
+                       .keyBy(new 
EventKeySelectorWithCustomKeyTypeInformation())
+
+                       .map(e -> e)
+                       .name(EVENT_IDENTITY_MAPPER.getName())
+                       .uid(EVENT_IDENTITY_MAPPER.getUid());
+       }
+
+       private static class EventIdentityFunctionWithCustomEventTypeInformation
+               implements MapFunction<Event, Event>, 
ResultTypeQueryable<Event> {
+
+               private final SingleThreadAccessCheckingTypeInfo<Event> 
typeInformation = new SingleThreadAccessCheckingTypeInfo<>(Event.class);
+
+               @Override
+               public Event map(Event value) {
+                       return value;
+               }
+
+               @Override
+               public TypeInformation<Event> getProducedType() {
+                       return typeInformation;
+               }
+       }
+
+       private static class EventKeySelectorWithCustomKeyTypeInformation
+               implements KeySelector<Event, Integer>, 
ResultTypeQueryable<Integer> {
+
+               private final SingleThreadAccessCheckingTypeInfo<Integer> 
typeInformation = new SingleThreadAccessCheckingTypeInfo<>(Integer.class);
+
+               @Override
+               public Integer getKey(Event value) {
+                       return value.getKey();
+               }
+
+               @Override
+               public TypeInformation<Integer> getProducedType() {
+                       return typeInformation;
+               }
+       }
 }
diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
index e282a1e..19d1f72 100644
--- 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
@@ -52,6 +52,17 @@ import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.isSimulateFailures;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment;
+import static org.apache.flink.streaming.tests.TestOperatorEnum.EVENT_SOURCE;
+import static 
org.apache.flink.streaming.tests.TestOperatorEnum.FAILURE_MAPPER_NAME;
+import static 
org.apache.flink.streaming.tests.TestOperatorEnum.KEYED_STATE_OPER_WITH_AVRO_SER;
+import static 
org.apache.flink.streaming.tests.TestOperatorEnum.KEYED_STATE_OPER_WITH_KRYO_AND_CUSTOM_SER;
+import static 
org.apache.flink.streaming.tests.TestOperatorEnum.OPERATOR_STATE_OPER;
+import static 
org.apache.flink.streaming.tests.TestOperatorEnum.SEMANTICS_CHECK_MAPPER;
+import static 
org.apache.flink.streaming.tests.TestOperatorEnum.SEMANTICS_CHECK_PRINT_SINK;
+import static 
org.apache.flink.streaming.tests.TestOperatorEnum.SLIDING_WINDOW_AGG;
+import static 
org.apache.flink.streaming.tests.TestOperatorEnum.SLIDING_WINDOW_CHECK_MAPPER;
+import static 
org.apache.flink.streaming.tests.TestOperatorEnum.SLIDING_WINDOW_CHECK_PRINT_SINK;
+import static 
org.apache.flink.streaming.tests.TestOperatorEnum.TIME_WINDOW_OPER;
 
 /**
  * A general purpose test job for Flink's DataStream API operators and 
primitives.
@@ -70,13 +81,6 @@ import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.
  *
  */
 public class DataStreamAllroundTestProgram {
-       private static final String KEYED_STATE_OPER_NAME = 
"ArtificalKeyedStateMapper";
-       private static final String OPERATOR_STATE_OPER_NAME = 
"ArtificalOperatorStateMapper";
-       private static final String TIME_WINDOW_OPER_NAME = 
"TumblingWindowOperator";
-       private static final String SEMANTICS_CHECK_MAPPER_NAME = 
"SemanticsCheckMapper";
-       private static final String FAILURE_MAPPER_NAME = "FailureMapper";
-       private static final String SLIDING_WINDOW_CHECK_MAPPER_NAME = 
"SlidingWindowCheckMapper";
-       private static final String SLIDING_WINDOW_AGG_NAME = 
"SlidingWindowOperator";
 
        public static void main(String[] args) throws Exception {
                final ParameterTool pt = ParameterTool.fromArgs(args);
@@ -86,7 +90,10 @@ public class DataStreamAllroundTestProgram {
                setupEnvironment(env, pt);
 
                // add a keyed stateful map operator, which uses Kryo for state 
serialization
-               DataStream<Event> eventStream = 
env.addSource(createEventSource(pt)).uid("0001")
+               DataStream<Event> eventStream = env
+                       .addSource(createEventSource(pt))
+                       .name(EVENT_SOURCE.getName())
+                       .uid(EVENT_SOURCE.getUid())
                        
.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
                        .keyBy(Event::getKey)
                        .map(createArtificialKeyedStateMapper(
@@ -94,18 +101,21 @@ public class DataStreamAllroundTestProgram {
                                        (MapFunction<Event, Event>) in -> in,
                                        // state is verified and updated per 
event as a wrapped ComplexPayload state object
                                        (Event event, ComplexPayload lastState) 
-> {
-                                                       if (lastState != null 
&& !lastState.getStrPayload().equals(KEYED_STATE_OPER_NAME)
+                                                       if (lastState != null 
&& 
!lastState.getStrPayload().equals(KEYED_STATE_OPER_WITH_KRYO_AND_CUSTOM_SER.getName())
                                                                        && 
lastState.getInnerPayLoad().getSequenceNumber() == (event.getSequenceNumber() - 
1)) {
                                                                
System.out.println("State is set or restored incorrectly");
                                                        }
-                                                       return new 
ComplexPayload(event, KEYED_STATE_OPER_NAME);
+                                                       return new 
ComplexPayload(event, KEYED_STATE_OPER_WITH_KRYO_AND_CUSTOM_SER.getName());
                                                },
                                        Arrays.asList(
                                                new 
KryoSerializer<>(ComplexPayload.class, env.getConfig()), // KryoSerializer
                                                new 
StatefulComplexPayloadSerializer()), // custom stateful serializer
                                        
Collections.singletonList(ComplexPayload.class) // KryoSerializer via type 
extraction
                                )
-                       ).returns(Event.class).name(KEYED_STATE_OPER_NAME + 
"_Kryo_and_Custom_Stateful").uid("0002");
+                       )
+                       .returns(Event.class)
+                       
.name(KEYED_STATE_OPER_WITH_KRYO_AND_CUSTOM_SER.getName())
+                       
.uid(KEYED_STATE_OPER_WITH_KRYO_AND_CUSTOM_SER.getUid());
 
                // add a keyed stateful map operator, which uses Avro for state 
serialization
                eventStream = eventStream
@@ -115,7 +125,7 @@ public class DataStreamAllroundTestProgram {
                                        (MapFunction<Event, Event>) in -> in,
                                        // state is verified and updated per 
event as a wrapped ComplexPayloadAvro state object
                                        (Event event, ComplexPayloadAvro 
lastState) -> {
-                                                       if (lastState != null 
&& !lastState.getStrPayload().equals(KEYED_STATE_OPER_NAME)
+                                                       if (lastState != null 
&& !lastState.getStrPayload().equals(KEYED_STATE_OPER_WITH_AVRO_SER.getName())
                                                                        && 
lastState.getInnerPayLoad().getSequenceNumber() == (event.getSequenceNumber() - 
1)) {
                                                                
System.out.println("State is set or restored incorrectly");
                                                        }
@@ -123,7 +133,7 @@ public class DataStreamAllroundTestProgram {
                                                        ComplexPayloadAvro 
payload = new ComplexPayloadAvro();
                                                        
payload.setEventTime(event.getEventTime());
                                                        
payload.setInnerPayLoad(new InnerPayLoadAvro(event.getSequenceNumber()));
-                                                       
payload.setStrPayload(KEYED_STATE_OPER_NAME);
+                                                       
payload.setStrPayload(KEYED_STATE_OPER_WITH_AVRO_SER.getName());
                                                        
payload.setStringList(Arrays.asList(String.valueOf(event.getKey()), 
event.getPayload()));
 
                                                        return payload;
@@ -132,12 +142,16 @@ public class DataStreamAllroundTestProgram {
                                                new 
AvroSerializer<>(ComplexPayloadAvro.class)), // custom AvroSerializer
                                        
Collections.singletonList(ComplexPayloadAvro.class) // AvroSerializer via type 
extraction
                                )
-                       ).returns(Event.class).name(KEYED_STATE_OPER_NAME + 
"_Avro").uid("0003");
+                       )
+                       .returns(Event.class)
+                       .name(KEYED_STATE_OPER_WITH_AVRO_SER.getName())
+                       .uid(KEYED_STATE_OPER_WITH_AVRO_SER.getUid());
 
                DataStream<Event> eventStream2 = eventStream
                        
.map(createArtificialOperatorStateMapper((MapFunction<Event, Event>) in -> in))
                        .returns(Event.class)
-                       .name(OPERATOR_STATE_OPER_NAME).uid("0004");
+                       .name(OPERATOR_STATE_OPER.getName())
+                       .uid(OPERATOR_STATE_OPER.getUid());
 
                // apply a tumbling window that simply passes forward window 
elements;
                // this allows the job to cover timers state
@@ -150,21 +164,27 @@ public class DataStreamAllroundTestProgram {
                                                out.collect(e);
                                        }
                                }
-                       }).name(TIME_WINDOW_OPER_NAME).uid("0005");
+                       })
+                       .name(TIME_WINDOW_OPER.getName())
+                       .uid(TIME_WINDOW_OPER.getUid());
+
+               eventStream3 = 
DataStreamAllroundTestJobFactory.verifyCustomStatefulTypeSerializer(eventStream3);
 
                if (isSimulateFailures(pt)) {
                        eventStream3 = eventStream3
                                .map(createFailureMapper(pt))
                                .setParallelism(1)
-                               .name(FAILURE_MAPPER_NAME).uid("0006");
+                               .name(FAILURE_MAPPER_NAME.getName())
+                               .uid(FAILURE_MAPPER_NAME.getUid());
                }
 
                eventStream3.keyBy(Event::getKey)
                        .flatMap(createSemanticsCheckMapper(pt))
-                       .name(SEMANTICS_CHECK_MAPPER_NAME)
-                       .uid("007")
+                       .name(SEMANTICS_CHECK_MAPPER.getName())
+                       .uid(SEMANTICS_CHECK_MAPPER.getUid())
                        .addSink(new PrintSinkFunction<>())
-                       .uid("008");
+                       .name(SEMANTICS_CHECK_PRINT_SINK.getName())
+                       .uid(SEMANTICS_CHECK_PRINT_SINK.getUid());
 
                // Check sliding windows aggregations. Output all elements 
assigned to a window and later on
                // check if each event was emitted slide_factor number of times
@@ -181,15 +201,16 @@ public class DataStreamAllroundTestProgram {
                                        out.collect(Tuple2.of(key, 
StreamSupport.stream(input.spliterator(), false).collect(Collectors.toList())));
                                }
                        })
-                       .name(SLIDING_WINDOW_AGG_NAME)
-                       .uid("009");
+                       .name(SLIDING_WINDOW_AGG.getName())
+                       .uid(SLIDING_WINDOW_AGG.getUid());
 
                eventStream4.keyBy(events -> events.f0)
                        .flatMap(createSlidingWindowCheckMapper(pt))
-                       .uid("010")
-                       .name(SLIDING_WINDOW_CHECK_MAPPER_NAME)
+                       .name(SLIDING_WINDOW_CHECK_MAPPER.getName())
+                       .uid(SLIDING_WINDOW_CHECK_MAPPER.getUid())
                        .addSink(new PrintSinkFunction<>())
-                       .uid("011");
+                       .name(SLIDING_WINDOW_CHECK_PRINT_SINK.getName())
+                       .uid(SLIDING_WINDOW_CHECK_PRINT_SINK.getUid());
 
                env.execute("General purpose test job");
        }
diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SingleThreadAccessCheckingTypeInfo.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SingleThreadAccessCheckingTypeInfo.java
new file mode 100644
index 0000000..bf7c670
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SingleThreadAccessCheckingTypeInfo.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.api.common.typeutils.SingleThreadAccessCheckingTypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.Objects;
+
+/** Custom {@link TypeInformation} to test custom {@link TypeSerializer}. */
+public class SingleThreadAccessCheckingTypeInfo<T> extends TypeInformation<T> {
+       private final TypeInformation<T> originalTypeInformation;
+
+       SingleThreadAccessCheckingTypeInfo(Class<T> clazz) {
+               this(TypeInformation.of(clazz));
+       }
+
+       private SingleThreadAccessCheckingTypeInfo(TypeInformation<T> 
originalTypeInformation) {
+               this.originalTypeInformation = originalTypeInformation;
+       }
+
+       @Override
+       public boolean isBasicType() {
+               return originalTypeInformation.isBasicType();
+       }
+
+       @Override
+       public boolean isTupleType() {
+               return originalTypeInformation.isTupleType();
+       }
+
+       @Override
+       public int getArity() {
+               return originalTypeInformation.getArity();
+       }
+
+       @Override
+       public int getTotalFields() {
+               return originalTypeInformation.getTotalFields();
+       }
+
+       @Override
+       public Class<T> getTypeClass() {
+               return originalTypeInformation.getTypeClass();
+       }
+
+       @Override
+       public boolean isKeyType() {
+               return originalTypeInformation.isKeyType();
+       }
+
+       @Override
+       public TypeSerializer<T> createSerializer(ExecutionConfig config) {
+               return new 
SingleThreadAccessCheckingTypeSerializer<>(originalTypeInformation.createSerializer(config));
+       }
+
+       @Override
+       public String toString() {
+               return originalTypeInformation.toString();
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o){
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               SingleThreadAccessCheckingTypeInfo that = 
(SingleThreadAccessCheckingTypeInfo) o;
+               return Objects.equals(originalTypeInformation, 
that.originalTypeInformation);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(originalTypeInformation);
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof SingleThreadAccessCheckingTypeInfo;
+       }
+}
diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/TestOperatorEnum.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/TestOperatorEnum.java
new file mode 100644
index 0000000..4769ced
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/TestOperatorEnum.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+/** Enum of names and uids of all test operators used in {@link 
DataStreamAllroundTestProgram}. */
+public enum TestOperatorEnum {
+       EVENT_SOURCE("EventSource", 1),
+       
KEYED_STATE_OPER_WITH_KRYO_AND_CUSTOM_SER("ArtificalKeyedStateMapper_Kryo_and_Custom_Stateful",
 2),
+       KEYED_STATE_OPER_WITH_AVRO_SER("ArtificalKeyedStateMapper_Avro", 3),
+       OPERATOR_STATE_OPER("ArtificalOperatorStateMapper", 4),
+       TIME_WINDOW_OPER("TumblingWindowOperator", 5),
+       FAILURE_MAPPER_NAME("FailureMapper", 6),
+       SEMANTICS_CHECK_MAPPER("SemanticsCheckMapper", 7),
+       SEMANTICS_CHECK_PRINT_SINK("SemanticsCheckPrintSink", 8),
+       SLIDING_WINDOW_AGG("SlidingWindowOperator", 9),
+       SLIDING_WINDOW_CHECK_MAPPER("SlidingWindowCheckMapper", 10),
+       SLIDING_WINDOW_CHECK_PRINT_SINK("SlidingWindowCheckPrintSink", 11),
+       
RESULT_TYPE_QUERYABLE_MAPPER_WITH_CUSTOM_SER("ResultTypeQueryableMapWithCustomStatefulTypeSerializer",
 12),
+       
MAPPER_RETURNS_OUT_WITH_CUSTOM_SER("MapReturnsOutputWithCustomStatefulTypeSerializer",
 13),
+       EVENT_IDENTITY_MAPPER("EventIdentityMapper", 14);
+
+       private final String name;
+       private final String uid;
+
+       TestOperatorEnum(String name, int uid) {
+               this.name = name;
+               this.uid = String.format("%04d", uid);
+       }
+
+       public String getName() {
+               return name;
+       }
+
+       public String getUid() {
+               return uid;
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotTransformerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotTransformerTest.java
index 044abaa..8450076 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotTransformerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotTransformerTest.java
@@ -21,29 +21,25 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import 
org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
+import 
org.apache.flink.api.common.typeutils.SingleThreadAccessCheckingTypeSerializer;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.util.StringUtils;
 
 import javax.annotation.Nullable;
 
-import java.io.IOException;
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.RunnableFuture;
-
-import static org.junit.Assert.assertEquals;
+import java.util.concurrent.atomic.AtomicReference;
 
 class StateSnapshotTransformerTest {
        private final AbstractKeyedStateBackend<Integer> backend;
@@ -130,7 +126,7 @@ class StateSnapshotTransformerTest {
                private TestListState() throws Exception {
                        this.state = backend.createInternalState(
                                VoidNamespaceSerializer.INSTANCE,
-                               new ListStateDescriptor<>("TestListState", new 
SingleThreadAccessCheckingTypeSerializer()),
+                               new ListStateDescriptor<>("TestListState", new 
SingleThreadAccessCheckingTypeSerializer<>(StringSerializer.INSTANCE)),
                                snapshotTransformFactory);
                        state.setCurrentNamespace(VoidNamespace.INSTANCE);
                }
@@ -199,100 +195,14 @@ class StateSnapshotTransformerTest {
                }
        }
 
-       private static class SingleThreadAccessCheckingTypeSerializer extends 
TypeSerializer<String> {
-               private final SingleThreadAccessChecker 
singleThreadAccessChecker = new SingleThreadAccessChecker();
-
-               @Override
-               public boolean isImmutableType() {
-                       singleThreadAccessChecker.checkSingleThreadAccess();
-                       return StringSerializer.INSTANCE.isImmutableType();
-               }
-
-               @Override
-               public TypeSerializer<String> duplicate() {
-                       singleThreadAccessChecker.checkSingleThreadAccess();
-                       return new SingleThreadAccessCheckingTypeSerializer();
-               }
-
-               @Override
-               public String createInstance() {
-                       singleThreadAccessChecker.checkSingleThreadAccess();
-                       return StringSerializer.INSTANCE.createInstance();
-               }
-
-               @Override
-               public String copy(String from) {
-                       singleThreadAccessChecker.checkSingleThreadAccess();
-                       return StringSerializer.INSTANCE.copy(from);
-               }
-
-               @Override
-               public String copy(String from, String reuse) {
-                       singleThreadAccessChecker.checkSingleThreadAccess();
-                       return StringSerializer.INSTANCE.copy(from, reuse);
-               }
-
-               @Override
-               public int getLength() {
-                       singleThreadAccessChecker.checkSingleThreadAccess();
-                       return StringSerializer.INSTANCE.getLength();
-               }
-
-               @Override
-               public void serialize(String record, DataOutputView target) 
throws IOException {
-                       singleThreadAccessChecker.checkSingleThreadAccess();
-                       StringSerializer.INSTANCE.serialize(record, target);
-               }
-
-               @Override
-               public String deserialize(DataInputView source) throws 
IOException {
-                       singleThreadAccessChecker.checkSingleThreadAccess();
-                       return StringSerializer.INSTANCE.deserialize(source);
-               }
-
-               @Override
-               public String deserialize(String reuse, DataInputView source) 
throws IOException {
-                       singleThreadAccessChecker.checkSingleThreadAccess();
-                       return StringSerializer.INSTANCE.deserialize(reuse, 
source);
-               }
-
-               @Override
-               public void copy(DataInputView source, DataOutputView target) 
throws IOException {
-                       singleThreadAccessChecker.checkSingleThreadAccess();
-                       StringSerializer.INSTANCE.copy(source, target);
-               }
-
-               @Override
-               public boolean equals(Object obj) {
-                       singleThreadAccessChecker.checkSingleThreadAccess();
-                       return obj == this ||
-                               (obj != null && obj.getClass() == getClass() &&
-                                       StringSerializer.INSTANCE.equals(obj));
-               }
-
-               @Override
-               public int hashCode() {
-                       singleThreadAccessChecker.checkSingleThreadAccess();
-                       return StringSerializer.INSTANCE.hashCode();
-               }
-
-               @Override
-               public TypeSerializerSnapshot<String> snapshotConfiguration() {
-                       singleThreadAccessChecker.checkSingleThreadAccess();
-                       return 
StringSerializer.INSTANCE.snapshotConfiguration();
-               }
-       }
+       private static class SingleThreadAccessChecker implements Serializable {
+               private static final long serialVersionUID = 
131020282727167064L;
 
-       private static class SingleThreadAccessChecker {
-               private Thread currentThread = null;
+               private final AtomicReference<Thread> currentThreadRef = new 
AtomicReference<>();
 
                void checkSingleThreadAccess() {
-                       if (currentThread == null) {
-                               currentThread = Thread.currentThread();
-                       } else {
-                               assertEquals("Concurrent access from another 
thread",
-                                       currentThread, Thread.currentThread());
-                       }
+                       currentThreadRef.compareAndSet(null, 
Thread.currentThread());
+                       assert 
(Thread.currentThread().equals(currentThreadRef.get())) : "Concurrent access 
from another thread";
                }
        }
 }

Reply via email to