This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git
commit ae7861dd0a124213dd89b434175093dc64ee1904 Author: Andrey Zagrebin <[email protected]> AuthorDate: Tue Feb 26 19:43:21 2019 +0100 [FLINK-9003][E2E] Add operators with input going through custom, stateful serialization. --- .../SingleThreadAccessCheckingTypeSerializer.java | 203 +++++++++++++++++++++ .../flink/api/common/typeutils/TypeSerializer.java | 5 +- .../tests/DataStreamAllroundTestJobFactory.java | 60 ++++++ .../tests/DataStreamAllroundTestProgram.java | 71 ++++--- .../tests/SingleThreadAccessCheckingTypeInfo.java | 101 ++++++++++ .../flink/streaming/tests/TestOperatorEnum.java | 53 ++++++ .../state/StateSnapshotTransformerTest.java | 108 +---------- 7 files changed, 474 insertions(+), 127 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/SingleThreadAccessCheckingTypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/SingleThreadAccessCheckingTypeSerializer.java new file mode 100644 index 0000000..d2ffb97 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/SingleThreadAccessCheckingTypeSerializer.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.concurrent.atomic.AtomicReference; + +@Internal +public class SingleThreadAccessCheckingTypeSerializer<T> extends TypeSerializer<T> { + private static final long serialVersionUID = 131020282727167064L; + + private final SingleThreadAccessChecker singleThreadAccessChecker; + private final TypeSerializer<T> originalSerializer; + + public SingleThreadAccessCheckingTypeSerializer(TypeSerializer<T> originalSerializer) { + this.singleThreadAccessChecker = new SingleThreadAccessChecker(); + this.originalSerializer = originalSerializer; + } + + @Override + public boolean isImmutableType() { + try (SingleThreadAccessCheck ignored = singleThreadAccessChecker.startSingleThreadAccessCheck()) { + return originalSerializer.isImmutableType(); + } + } + + @Override + public TypeSerializer<T> duplicate() { + try (SingleThreadAccessCheck ignored = singleThreadAccessChecker.startSingleThreadAccessCheck()) { + return new SingleThreadAccessCheckingTypeSerializer<>(originalSerializer.duplicate()); + } + } + + @Override + public T createInstance() { + try (SingleThreadAccessCheck ignored = singleThreadAccessChecker.startSingleThreadAccessCheck()) { + return originalSerializer.createInstance(); + } + } + + @Override + public T copy(T from) { + try (SingleThreadAccessCheck ignored = singleThreadAccessChecker.startSingleThreadAccessCheck()) { + return originalSerializer.copy(from); + } + } + + @Override + public T copy(T from, T reuse) { + try (SingleThreadAccessCheck ignored = singleThreadAccessChecker.startSingleThreadAccessCheck()) { + return originalSerializer.copy(from, reuse); + } + } + + @Override + public int getLength() { + try (SingleThreadAccessCheck ignored = singleThreadAccessChecker.startSingleThreadAccessCheck()) { + return originalSerializer.getLength(); + } + } + + @Override + public void serialize(T record, DataOutputView target) throws IOException { + try (SingleThreadAccessCheck ignored = singleThreadAccessChecker.startSingleThreadAccessCheck()) { + originalSerializer.serialize(record, target); + } + } + + @Override + public T deserialize(DataInputView source) throws IOException { + try (SingleThreadAccessCheck ignored = singleThreadAccessChecker.startSingleThreadAccessCheck()) { + return originalSerializer.deserialize(source); + } + } + + @Override + public T deserialize(T reuse, DataInputView source) throws IOException { + try (SingleThreadAccessCheck ignored = singleThreadAccessChecker.startSingleThreadAccessCheck()) { + return originalSerializer.deserialize(reuse, source); + } + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + try (SingleThreadAccessCheck ignored = singleThreadAccessChecker.startSingleThreadAccessCheck()) { + originalSerializer.copy(source, target); + } + } + + @Override + public boolean equals(Object obj) { + try (SingleThreadAccessCheck ignored = singleThreadAccessChecker.startSingleThreadAccessCheck()) { + return obj == this || + (obj != null && obj.getClass() == getClass() && + originalSerializer.equals(obj)); + } + } + + @Override + public int hashCode() { + try (SingleThreadAccessCheck ignored = singleThreadAccessChecker.startSingleThreadAccessCheck()) { + return originalSerializer.hashCode(); + } + } + + @Override + public TypeSerializerSnapshot<T> snapshotConfiguration() { + try (SingleThreadAccessCheck ignored = singleThreadAccessChecker.startSingleThreadAccessCheck()) { + return new SingleThreadAccessCheckingTypeSerializerSnapshot<>(this); + } + } + + public static class SingleThreadAccessCheckingTypeSerializerSnapshot<T> + extends CompositeTypeSerializerSnapshot<T, SingleThreadAccessCheckingTypeSerializer<T>> { + + @SuppressWarnings({"unchecked", "unused"}) + public SingleThreadAccessCheckingTypeSerializerSnapshot() { + super((Class<SingleThreadAccessCheckingTypeSerializer<T>>) (Class<?>) SingleThreadAccessCheckingTypeSerializer.class); + } + + SingleThreadAccessCheckingTypeSerializerSnapshot(SingleThreadAccessCheckingTypeSerializer<T> serializerInstance) { + super(serializerInstance); + } + + @Override + protected int getCurrentOuterSnapshotVersion() { + return 1; + } + + @Override + protected TypeSerializer<?>[] getNestedSerializers(SingleThreadAccessCheckingTypeSerializer<T> outerSerializer) { + return new TypeSerializer[] { outerSerializer.originalSerializer }; + } + + @SuppressWarnings("unchecked") + @Override + protected SingleThreadAccessCheckingTypeSerializer<T> createOuterSerializerWithNestedSerializers( + TypeSerializer<?>[] nestedSerializers) { + + return new SingleThreadAccessCheckingTypeSerializer<>((TypeSerializer<T>) nestedSerializers[0]); + } + } + + private void writeObject(ObjectOutputStream outputStream) throws IOException { + try (SingleThreadAccessCheck ignored = singleThreadAccessChecker.startSingleThreadAccessCheck()) { + outputStream.defaultWriteObject(); + } + } + + private static class SingleThreadAccessChecker implements Serializable { + private static final long serialVersionUID = 131020282727167064L; + + private transient AtomicReference<Thread> currentThreadRef = new AtomicReference<>(); + + SingleThreadAccessCheck startSingleThreadAccessCheck() { + assert(currentThreadRef.compareAndSet(null, Thread.currentThread())) : + "The checker has concurrent access from " + currentThreadRef.get(); + return new SingleThreadAccessCheck(currentThreadRef); + } + + private void readObject(ObjectInputStream inputStream) throws ClassNotFoundException, IOException { + inputStream.defaultReadObject(); + currentThreadRef = new AtomicReference<>(); + } + } + + private static class SingleThreadAccessCheck implements AutoCloseable { + private final AtomicReference<Thread> currentThreadRef; + + private SingleThreadAccessCheck(AtomicReference<Thread> currentThreadRef) { + this.currentThreadRef = currentThreadRef; + } + + @Override + public void close() { + assert(currentThreadRef.compareAndSet(Thread.currentThread(), null)) : + "The checker has concurrent access from " + currentThreadRef.get(); + } + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java index f0036c4..74d54b9 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java @@ -29,9 +29,8 @@ import java.io.Serializable; * This interface describes the methods that are required for a data type to be handled by the Flink * runtime. Specifically, this interface contains the serialization and copying methods. * - * <p>The methods in this class are assumed to be stateless, such that it is effectively thread safe. Stateful - * implementations of the methods may lead to unpredictable side effects and will compromise both stability and - * correctness of the program. + * <p>The methods in this class are not necessarily thread safe. To avoid unpredictable side effects, + * it is recommended to call {@code duplicate()} method and use one serializer instance per thread. * * <p><b>Upgrading TypeSerializers to the new TypeSerializerSnapshot model</b> * diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java index 06325ad..af39522 100644 --- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java @@ -24,8 +24,11 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; @@ -34,6 +37,7 @@ import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.CheckpointConfig; @@ -52,6 +56,10 @@ import org.apache.flink.streaming.tests.artificialstate.builder.ArtificialValueS import java.util.ArrayList; import java.util.List; +import static org.apache.flink.streaming.tests.TestOperatorEnum.EVENT_IDENTITY_MAPPER; +import static org.apache.flink.streaming.tests.TestOperatorEnum.MAPPER_RETURNS_OUT_WITH_CUSTOM_SER; +import static org.apache.flink.streaming.tests.TestOperatorEnum.RESULT_TYPE_QUERYABLE_MAPPER_WITH_CUSTOM_SER; + /** * A factory for components of general purpose test jobs for Flink's DataStream API operators and primitives. * @@ -490,4 +498,56 @@ public class DataStreamAllroundTestJobFactory { TEST_SLIDE_FACTOR.defaultValue() )); } + + static DataStream<Event> verifyCustomStatefulTypeSerializer(DataStream<Event> eventStream) { + return eventStream + .map(new EventIdentityFunctionWithCustomEventTypeInformation()) + .name(RESULT_TYPE_QUERYABLE_MAPPER_WITH_CUSTOM_SER.getName()) + .uid(RESULT_TYPE_QUERYABLE_MAPPER_WITH_CUSTOM_SER.getUid()) + // apply a keyBy so that we have a non-chained operator with Event as input type that goes through serialization + .keyBy(new EventKeySelectorWithCustomKeyTypeInformation()) + + .map(e -> e) + .returns(new SingleThreadAccessCheckingTypeInfo<>(Event.class)) + .name(MAPPER_RETURNS_OUT_WITH_CUSTOM_SER.getName()) + .uid(MAPPER_RETURNS_OUT_WITH_CUSTOM_SER.getUid()) + // apply a keyBy so that we have a non-chained operator with Event as input type that goes through serialization + .keyBy(new EventKeySelectorWithCustomKeyTypeInformation()) + + .map(e -> e) + .name(EVENT_IDENTITY_MAPPER.getName()) + .uid(EVENT_IDENTITY_MAPPER.getUid()); + } + + private static class EventIdentityFunctionWithCustomEventTypeInformation + implements MapFunction<Event, Event>, ResultTypeQueryable<Event> { + + private final SingleThreadAccessCheckingTypeInfo<Event> typeInformation = new SingleThreadAccessCheckingTypeInfo<>(Event.class); + + @Override + public Event map(Event value) { + return value; + } + + @Override + public TypeInformation<Event> getProducedType() { + return typeInformation; + } + } + + private static class EventKeySelectorWithCustomKeyTypeInformation + implements KeySelector<Event, Integer>, ResultTypeQueryable<Integer> { + + private final SingleThreadAccessCheckingTypeInfo<Integer> typeInformation = new SingleThreadAccessCheckingTypeInfo<>(Integer.class); + + @Override + public Integer getKey(Event value) { + return value.getKey(); + } + + @Override + public TypeInformation<Integer> getProducedType() { + return typeInformation; + } + } } diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java index e282a1e..19d1f72 100644 --- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java @@ -52,6 +52,17 @@ import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory. import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.isSimulateFailures; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment; +import static org.apache.flink.streaming.tests.TestOperatorEnum.EVENT_SOURCE; +import static org.apache.flink.streaming.tests.TestOperatorEnum.FAILURE_MAPPER_NAME; +import static org.apache.flink.streaming.tests.TestOperatorEnum.KEYED_STATE_OPER_WITH_AVRO_SER; +import static org.apache.flink.streaming.tests.TestOperatorEnum.KEYED_STATE_OPER_WITH_KRYO_AND_CUSTOM_SER; +import static org.apache.flink.streaming.tests.TestOperatorEnum.OPERATOR_STATE_OPER; +import static org.apache.flink.streaming.tests.TestOperatorEnum.SEMANTICS_CHECK_MAPPER; +import static org.apache.flink.streaming.tests.TestOperatorEnum.SEMANTICS_CHECK_PRINT_SINK; +import static org.apache.flink.streaming.tests.TestOperatorEnum.SLIDING_WINDOW_AGG; +import static org.apache.flink.streaming.tests.TestOperatorEnum.SLIDING_WINDOW_CHECK_MAPPER; +import static org.apache.flink.streaming.tests.TestOperatorEnum.SLIDING_WINDOW_CHECK_PRINT_SINK; +import static org.apache.flink.streaming.tests.TestOperatorEnum.TIME_WINDOW_OPER; /** * A general purpose test job for Flink's DataStream API operators and primitives. @@ -70,13 +81,6 @@ import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory. * */ public class DataStreamAllroundTestProgram { - private static final String KEYED_STATE_OPER_NAME = "ArtificalKeyedStateMapper"; - private static final String OPERATOR_STATE_OPER_NAME = "ArtificalOperatorStateMapper"; - private static final String TIME_WINDOW_OPER_NAME = "TumblingWindowOperator"; - private static final String SEMANTICS_CHECK_MAPPER_NAME = "SemanticsCheckMapper"; - private static final String FAILURE_MAPPER_NAME = "FailureMapper"; - private static final String SLIDING_WINDOW_CHECK_MAPPER_NAME = "SlidingWindowCheckMapper"; - private static final String SLIDING_WINDOW_AGG_NAME = "SlidingWindowOperator"; public static void main(String[] args) throws Exception { final ParameterTool pt = ParameterTool.fromArgs(args); @@ -86,7 +90,10 @@ public class DataStreamAllroundTestProgram { setupEnvironment(env, pt); // add a keyed stateful map operator, which uses Kryo for state serialization - DataStream<Event> eventStream = env.addSource(createEventSource(pt)).uid("0001") + DataStream<Event> eventStream = env + .addSource(createEventSource(pt)) + .name(EVENT_SOURCE.getName()) + .uid(EVENT_SOURCE.getUid()) .assignTimestampsAndWatermarks(createTimestampExtractor(pt)) .keyBy(Event::getKey) .map(createArtificialKeyedStateMapper( @@ -94,18 +101,21 @@ public class DataStreamAllroundTestProgram { (MapFunction<Event, Event>) in -> in, // state is verified and updated per event as a wrapped ComplexPayload state object (Event event, ComplexPayload lastState) -> { - if (lastState != null && !lastState.getStrPayload().equals(KEYED_STATE_OPER_NAME) + if (lastState != null && !lastState.getStrPayload().equals(KEYED_STATE_OPER_WITH_KRYO_AND_CUSTOM_SER.getName()) && lastState.getInnerPayLoad().getSequenceNumber() == (event.getSequenceNumber() - 1)) { System.out.println("State is set or restored incorrectly"); } - return new ComplexPayload(event, KEYED_STATE_OPER_NAME); + return new ComplexPayload(event, KEYED_STATE_OPER_WITH_KRYO_AND_CUSTOM_SER.getName()); }, Arrays.asList( new KryoSerializer<>(ComplexPayload.class, env.getConfig()), // KryoSerializer new StatefulComplexPayloadSerializer()), // custom stateful serializer Collections.singletonList(ComplexPayload.class) // KryoSerializer via type extraction ) - ).returns(Event.class).name(KEYED_STATE_OPER_NAME + "_Kryo_and_Custom_Stateful").uid("0002"); + ) + .returns(Event.class) + .name(KEYED_STATE_OPER_WITH_KRYO_AND_CUSTOM_SER.getName()) + .uid(KEYED_STATE_OPER_WITH_KRYO_AND_CUSTOM_SER.getUid()); // add a keyed stateful map operator, which uses Avro for state serialization eventStream = eventStream @@ -115,7 +125,7 @@ public class DataStreamAllroundTestProgram { (MapFunction<Event, Event>) in -> in, // state is verified and updated per event as a wrapped ComplexPayloadAvro state object (Event event, ComplexPayloadAvro lastState) -> { - if (lastState != null && !lastState.getStrPayload().equals(KEYED_STATE_OPER_NAME) + if (lastState != null && !lastState.getStrPayload().equals(KEYED_STATE_OPER_WITH_AVRO_SER.getName()) && lastState.getInnerPayLoad().getSequenceNumber() == (event.getSequenceNumber() - 1)) { System.out.println("State is set or restored incorrectly"); } @@ -123,7 +133,7 @@ public class DataStreamAllroundTestProgram { ComplexPayloadAvro payload = new ComplexPayloadAvro(); payload.setEventTime(event.getEventTime()); payload.setInnerPayLoad(new InnerPayLoadAvro(event.getSequenceNumber())); - payload.setStrPayload(KEYED_STATE_OPER_NAME); + payload.setStrPayload(KEYED_STATE_OPER_WITH_AVRO_SER.getName()); payload.setStringList(Arrays.asList(String.valueOf(event.getKey()), event.getPayload())); return payload; @@ -132,12 +142,16 @@ public class DataStreamAllroundTestProgram { new AvroSerializer<>(ComplexPayloadAvro.class)), // custom AvroSerializer Collections.singletonList(ComplexPayloadAvro.class) // AvroSerializer via type extraction ) - ).returns(Event.class).name(KEYED_STATE_OPER_NAME + "_Avro").uid("0003"); + ) + .returns(Event.class) + .name(KEYED_STATE_OPER_WITH_AVRO_SER.getName()) + .uid(KEYED_STATE_OPER_WITH_AVRO_SER.getUid()); DataStream<Event> eventStream2 = eventStream .map(createArtificialOperatorStateMapper((MapFunction<Event, Event>) in -> in)) .returns(Event.class) - .name(OPERATOR_STATE_OPER_NAME).uid("0004"); + .name(OPERATOR_STATE_OPER.getName()) + .uid(OPERATOR_STATE_OPER.getUid()); // apply a tumbling window that simply passes forward window elements; // this allows the job to cover timers state @@ -150,21 +164,27 @@ public class DataStreamAllroundTestProgram { out.collect(e); } } - }).name(TIME_WINDOW_OPER_NAME).uid("0005"); + }) + .name(TIME_WINDOW_OPER.getName()) + .uid(TIME_WINDOW_OPER.getUid()); + + eventStream3 = DataStreamAllroundTestJobFactory.verifyCustomStatefulTypeSerializer(eventStream3); if (isSimulateFailures(pt)) { eventStream3 = eventStream3 .map(createFailureMapper(pt)) .setParallelism(1) - .name(FAILURE_MAPPER_NAME).uid("0006"); + .name(FAILURE_MAPPER_NAME.getName()) + .uid(FAILURE_MAPPER_NAME.getUid()); } eventStream3.keyBy(Event::getKey) .flatMap(createSemanticsCheckMapper(pt)) - .name(SEMANTICS_CHECK_MAPPER_NAME) - .uid("007") + .name(SEMANTICS_CHECK_MAPPER.getName()) + .uid(SEMANTICS_CHECK_MAPPER.getUid()) .addSink(new PrintSinkFunction<>()) - .uid("008"); + .name(SEMANTICS_CHECK_PRINT_SINK.getName()) + .uid(SEMANTICS_CHECK_PRINT_SINK.getUid()); // Check sliding windows aggregations. Output all elements assigned to a window and later on // check if each event was emitted slide_factor number of times @@ -181,15 +201,16 @@ public class DataStreamAllroundTestProgram { out.collect(Tuple2.of(key, StreamSupport.stream(input.spliterator(), false).collect(Collectors.toList()))); } }) - .name(SLIDING_WINDOW_AGG_NAME) - .uid("009"); + .name(SLIDING_WINDOW_AGG.getName()) + .uid(SLIDING_WINDOW_AGG.getUid()); eventStream4.keyBy(events -> events.f0) .flatMap(createSlidingWindowCheckMapper(pt)) - .uid("010") - .name(SLIDING_WINDOW_CHECK_MAPPER_NAME) + .name(SLIDING_WINDOW_CHECK_MAPPER.getName()) + .uid(SLIDING_WINDOW_CHECK_MAPPER.getUid()) .addSink(new PrintSinkFunction<>()) - .uid("011"); + .name(SLIDING_WINDOW_CHECK_PRINT_SINK.getName()) + .uid(SLIDING_WINDOW_CHECK_PRINT_SINK.getUid()); env.execute("General purpose test job"); } diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SingleThreadAccessCheckingTypeInfo.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SingleThreadAccessCheckingTypeInfo.java new file mode 100644 index 0000000..bf7c670 --- /dev/null +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SingleThreadAccessCheckingTypeInfo.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.SingleThreadAccessCheckingTypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import java.util.Objects; + +/** Custom {@link TypeInformation} to test custom {@link TypeSerializer}. */ +public class SingleThreadAccessCheckingTypeInfo<T> extends TypeInformation<T> { + private final TypeInformation<T> originalTypeInformation; + + SingleThreadAccessCheckingTypeInfo(Class<T> clazz) { + this(TypeInformation.of(clazz)); + } + + private SingleThreadAccessCheckingTypeInfo(TypeInformation<T> originalTypeInformation) { + this.originalTypeInformation = originalTypeInformation; + } + + @Override + public boolean isBasicType() { + return originalTypeInformation.isBasicType(); + } + + @Override + public boolean isTupleType() { + return originalTypeInformation.isTupleType(); + } + + @Override + public int getArity() { + return originalTypeInformation.getArity(); + } + + @Override + public int getTotalFields() { + return originalTypeInformation.getTotalFields(); + } + + @Override + public Class<T> getTypeClass() { + return originalTypeInformation.getTypeClass(); + } + + @Override + public boolean isKeyType() { + return originalTypeInformation.isKeyType(); + } + + @Override + public TypeSerializer<T> createSerializer(ExecutionConfig config) { + return new SingleThreadAccessCheckingTypeSerializer<>(originalTypeInformation.createSerializer(config)); + } + + @Override + public String toString() { + return originalTypeInformation.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o){ + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SingleThreadAccessCheckingTypeInfo that = (SingleThreadAccessCheckingTypeInfo) o; + return Objects.equals(originalTypeInformation, that.originalTypeInformation); + } + + @Override + public int hashCode() { + return Objects.hash(originalTypeInformation); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof SingleThreadAccessCheckingTypeInfo; + } +} diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/TestOperatorEnum.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/TestOperatorEnum.java new file mode 100644 index 0000000..4769ced --- /dev/null +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/TestOperatorEnum.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +/** Enum of names and uids of all test operators used in {@link DataStreamAllroundTestProgram}. */ +public enum TestOperatorEnum { + EVENT_SOURCE("EventSource", 1), + KEYED_STATE_OPER_WITH_KRYO_AND_CUSTOM_SER("ArtificalKeyedStateMapper_Kryo_and_Custom_Stateful", 2), + KEYED_STATE_OPER_WITH_AVRO_SER("ArtificalKeyedStateMapper_Avro", 3), + OPERATOR_STATE_OPER("ArtificalOperatorStateMapper", 4), + TIME_WINDOW_OPER("TumblingWindowOperator", 5), + FAILURE_MAPPER_NAME("FailureMapper", 6), + SEMANTICS_CHECK_MAPPER("SemanticsCheckMapper", 7), + SEMANTICS_CHECK_PRINT_SINK("SemanticsCheckPrintSink", 8), + SLIDING_WINDOW_AGG("SlidingWindowOperator", 9), + SLIDING_WINDOW_CHECK_MAPPER("SlidingWindowCheckMapper", 10), + SLIDING_WINDOW_CHECK_PRINT_SINK("SlidingWindowCheckPrintSink", 11), + RESULT_TYPE_QUERYABLE_MAPPER_WITH_CUSTOM_SER("ResultTypeQueryableMapWithCustomStatefulTypeSerializer", 12), + MAPPER_RETURNS_OUT_WITH_CUSTOM_SER("MapReturnsOutputWithCustomStatefulTypeSerializer", 13), + EVENT_IDENTITY_MAPPER("EventIdentityMapper", 14); + + private final String name; + private final String uid; + + TestOperatorEnum(String name, int uid) { + this.name = name; + this.uid = String.format("%04d", uid); + } + + public String getName() { + return name; + } + + public String getUid() { + return uid; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotTransformerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotTransformerTest.java index 044abaa..8450076 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotTransformerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotTransformerTest.java @@ -21,29 +21,25 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory; import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.runtime.state.internal.InternalMapState; import org.apache.flink.runtime.state.internal.InternalValueState; +import org.apache.flink.api.common.typeutils.SingleThreadAccessCheckingTypeSerializer; import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; import org.apache.flink.util.StringUtils; import javax.annotation.Nullable; -import java.io.IOException; +import java.io.Serializable; import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.Random; import java.util.concurrent.RunnableFuture; - -import static org.junit.Assert.assertEquals; +import java.util.concurrent.atomic.AtomicReference; class StateSnapshotTransformerTest { private final AbstractKeyedStateBackend<Integer> backend; @@ -130,7 +126,7 @@ class StateSnapshotTransformerTest { private TestListState() throws Exception { this.state = backend.createInternalState( VoidNamespaceSerializer.INSTANCE, - new ListStateDescriptor<>("TestListState", new SingleThreadAccessCheckingTypeSerializer()), + new ListStateDescriptor<>("TestListState", new SingleThreadAccessCheckingTypeSerializer<>(StringSerializer.INSTANCE)), snapshotTransformFactory); state.setCurrentNamespace(VoidNamespace.INSTANCE); } @@ -199,100 +195,14 @@ class StateSnapshotTransformerTest { } } - private static class SingleThreadAccessCheckingTypeSerializer extends TypeSerializer<String> { - private final SingleThreadAccessChecker singleThreadAccessChecker = new SingleThreadAccessChecker(); - - @Override - public boolean isImmutableType() { - singleThreadAccessChecker.checkSingleThreadAccess(); - return StringSerializer.INSTANCE.isImmutableType(); - } - - @Override - public TypeSerializer<String> duplicate() { - singleThreadAccessChecker.checkSingleThreadAccess(); - return new SingleThreadAccessCheckingTypeSerializer(); - } - - @Override - public String createInstance() { - singleThreadAccessChecker.checkSingleThreadAccess(); - return StringSerializer.INSTANCE.createInstance(); - } - - @Override - public String copy(String from) { - singleThreadAccessChecker.checkSingleThreadAccess(); - return StringSerializer.INSTANCE.copy(from); - } - - @Override - public String copy(String from, String reuse) { - singleThreadAccessChecker.checkSingleThreadAccess(); - return StringSerializer.INSTANCE.copy(from, reuse); - } - - @Override - public int getLength() { - singleThreadAccessChecker.checkSingleThreadAccess(); - return StringSerializer.INSTANCE.getLength(); - } - - @Override - public void serialize(String record, DataOutputView target) throws IOException { - singleThreadAccessChecker.checkSingleThreadAccess(); - StringSerializer.INSTANCE.serialize(record, target); - } - - @Override - public String deserialize(DataInputView source) throws IOException { - singleThreadAccessChecker.checkSingleThreadAccess(); - return StringSerializer.INSTANCE.deserialize(source); - } - - @Override - public String deserialize(String reuse, DataInputView source) throws IOException { - singleThreadAccessChecker.checkSingleThreadAccess(); - return StringSerializer.INSTANCE.deserialize(reuse, source); - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - singleThreadAccessChecker.checkSingleThreadAccess(); - StringSerializer.INSTANCE.copy(source, target); - } - - @Override - public boolean equals(Object obj) { - singleThreadAccessChecker.checkSingleThreadAccess(); - return obj == this || - (obj != null && obj.getClass() == getClass() && - StringSerializer.INSTANCE.equals(obj)); - } - - @Override - public int hashCode() { - singleThreadAccessChecker.checkSingleThreadAccess(); - return StringSerializer.INSTANCE.hashCode(); - } - - @Override - public TypeSerializerSnapshot<String> snapshotConfiguration() { - singleThreadAccessChecker.checkSingleThreadAccess(); - return StringSerializer.INSTANCE.snapshotConfiguration(); - } - } + private static class SingleThreadAccessChecker implements Serializable { + private static final long serialVersionUID = 131020282727167064L; - private static class SingleThreadAccessChecker { - private Thread currentThread = null; + private final AtomicReference<Thread> currentThreadRef = new AtomicReference<>(); void checkSingleThreadAccess() { - if (currentThread == null) { - currentThread = Thread.currentThread(); - } else { - assertEquals("Concurrent access from another thread", - currentThread, Thread.currentThread()); - } + currentThreadRef.compareAndSet(null, Thread.currentThread()); + assert (Thread.currentThread().equals(currentThreadRef.get())) : "Concurrent access from another thread"; } } }
