[FLINK-8878] [core] Add concurrency check Kryo Serializer on DEBUG level

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/57ff6e89
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/57ff6e89
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/57ff6e89

Branch: refs/heads/master
Commit: 57ff6e8930db0bfdd8e7cbb8418d9a4b46ca4a61
Parents: 4201224
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Mar 4 12:20:17 2018 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Mar 7 18:10:34 2018 +0100

----------------------------------------------------------------------
 .../typeutils/runtime/kryo/KryoSerializer.java  | 191 +++++++++++++------
 .../kryo/KryoSerializerDebugInitHelper.java     |  47 +++++
 ...erializerConcurrencyCheckInactiveITCase.java |  62 ++++++
 .../kryo/KryoSerializerConcurrencyTest.java     |  95 +++++++++
 4 files changed, 338 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/57ff6e89/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index 06ba906..7c97c5c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -74,6 +74,12 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(KryoSerializer.class);
 
+       /** Flag whether to check for concurrent thread access.
+        * Because this flag is static final, a value of 'false' allows the JIT 
compiler to eliminate
+        * the guarded code sections. */
+       private static final boolean CONCURRENT_ACCESS_CHECK =
+                       LOG.isDebugEnabled() || 
KryoSerializerDebugInitHelper.setToDebug;
+
        static {
                configureKryoLogging();
        }
@@ -112,6 +118,9 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
        private LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> 
registeredTypesWithSerializerClasses;
        private LinkedHashSet<Class<?>> registeredTypes;
 
+       // for debugging purposes
+       private transient volatile Thread currentThread;
+
        // 
------------------------------------------------------------------------
 
        public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){
@@ -174,26 +183,38 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
                if (from == null) {
                        return null;
                }
-               checkKryoInitialized();
-               try {
-                       return kryo.copy(from);
+
+               if (CONCURRENT_ACCESS_CHECK) {
+                       enterExclusiveThread();
                }
-               catch(KryoException ke) {
-                       // kryo was unable to copy it, so we do it through 
serialization:
-                       ByteArrayOutputStream baout = new 
ByteArrayOutputStream();
-                       Output output = new Output(baout);
 
-                       kryo.writeObject(output, from);
+               try {
+                       checkKryoInitialized();
+                       try {
+                               return kryo.copy(from);
+                       }
+                       catch (KryoException ke) {
+                               // kryo was unable to copy it, so we do it 
through serialization:
+                               ByteArrayOutputStream baout = new 
ByteArrayOutputStream();
+                               Output output = new Output(baout);
+
+                               kryo.writeObject(output, from);
 
-                       output.close();
+                               output.close();
 
-                       ByteArrayInputStream bain = new 
ByteArrayInputStream(baout.toByteArray());
-                       Input input = new Input(bain);
+                               ByteArrayInputStream bain = new 
ByteArrayInputStream(baout.toByteArray());
+                               Input input = new Input(bain);
 
-                       return (T)kryo.readObject(input, from.getClass());
+                               return (T)kryo.readObject(input, 
from.getClass());
+                       }
+               }
+               finally {
+                       if (CONCURRENT_ACCESS_CHECK) {
+                               exitExclusiveThread();
+                       }
                }
        }
-       
+
        @Override
        public T copy(T from, T reuse) {
                return copy(from);
@@ -206,35 +227,47 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 
        @Override
        public void serialize(T record, DataOutputView target) throws 
IOException {
-               checkKryoInitialized();
-               if (target != previousOut) {
-                       DataOutputViewStream outputStream = new 
DataOutputViewStream(target);
-                       output = new Output(outputStream);
-                       previousOut = target;
-               }
-
-               // Sanity check: Make sure that the output is cleared/has been 
flushed by the last call
-               // otherwise data might be written multiple times in case of a 
previous EOFException
-               if (output.position() != 0) {
-                       throw new IllegalStateException("The Kryo Output still 
contains data from a previous " +
-                               "serialize call. It has to be flushed or 
cleared at the end of the serialize call.");
+               if (CONCURRENT_ACCESS_CHECK) {
+                       enterExclusiveThread();
                }
 
                try {
-                       kryo.writeClassAndObject(output, record);
-                       output.flush();
-               }
-               catch (KryoException ke) {
-                       // make sure that the Kryo output buffer is cleared in 
case that we can recover from
-                       // the exception (e.g. EOFException which denotes 
buffer full)
-                       output.clear();
-
-                       Throwable cause = ke.getCause();
-                       if (cause instanceof EOFException) {
-                               throw (EOFException) cause;
+                       checkKryoInitialized();
+
+                       if (target != previousOut) {
+                               DataOutputViewStream outputStream = new 
DataOutputViewStream(target);
+                               output = new Output(outputStream);
+                               previousOut = target;
+                       }
+
+                       // Sanity check: Make sure that the output is 
cleared/has been flushed by the last call
+                       // otherwise data might be written multiple times in 
case of a previous EOFException
+                       if (output.position() != 0) {
+                               throw new IllegalStateException("The Kryo 
Output still contains data from a previous " +
+                                       "serialize call. It has to be flushed 
or cleared at the end of the serialize call.");
+                       }
+
+                       try {
+                               kryo.writeClassAndObject(output, record);
+                               output.flush();
+                       }
+                       catch (KryoException ke) {
+                               // make sure that the Kryo output buffer is 
cleared in case that we can recover from
+                               // the exception (e.g. EOFException which 
denotes buffer full)
+                               output.clear();
+
+                               Throwable cause = ke.getCause();
+                               if (cause instanceof EOFException) {
+                                       throw (EOFException) cause;
+                               }
+                               else {
+                                       throw ke;
+                               }
                        }
-                       else {
-                               throw ke;
+               }
+               finally {
+                       if (CONCURRENT_ACCESS_CHECK) {
+                               exitExclusiveThread();
                        }
                }
        }
@@ -242,26 +275,38 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
        @SuppressWarnings("unchecked")
        @Override
        public T deserialize(DataInputView source) throws IOException {
-               checkKryoInitialized();
-               if (source != previousIn) {
-                       DataInputViewStream inputStream = new 
DataInputViewStream(source);
-                       input = new NoFetchingInput(inputStream);
-                       previousIn = source;
+               if (CONCURRENT_ACCESS_CHECK) {
+                       enterExclusiveThread();
                }
 
                try {
-                       return (T) kryo.readClassAndObject(input);
-               } catch (KryoException ke) {
-                       Throwable cause = ke.getCause();
-
-                       if (cause instanceof EOFException) {
-                               throw (EOFException) cause;
-                       } else {
-                               throw ke;
+                       checkKryoInitialized();
+
+                       if (source != previousIn) {
+                               DataInputViewStream inputStream = new 
DataInputViewStream(source);
+                               input = new NoFetchingInput(inputStream);
+                               previousIn = source;
+                       }
+
+                       try {
+                               return (T) kryo.readClassAndObject(input);
+                       } catch (KryoException ke) {
+                               Throwable cause = ke.getCause();
+
+                               if (cause instanceof EOFException) {
+                                       throw (EOFException) cause;
+                               } else {
+                                       throw ke;
+                               }
+                       }
+               }
+               finally {
+                       if (CONCURRENT_ACCESS_CHECK) {
+                               exitExclusiveThread();
                        }
                }
        }
-       
+
        @Override
        public T deserialize(T reuse, DataInputView source) throws IOException {
                return deserialize(source);
@@ -269,13 +314,24 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 
        @Override
        public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-               checkKryoInitialized();
-               if(this.copyInstance == null){
-                       this.copyInstance = createInstance();
+               if (CONCURRENT_ACCESS_CHECK) {
+                       enterExclusiveThread();
                }
 
-               T tmp = deserialize(copyInstance, source);
-               serialize(tmp, target);
+               try {
+                       checkKryoInitialized();
+                       if (this.copyInstance == null){
+                               this.copyInstance = createInstance();
+                       }
+
+                       T tmp = deserialize(copyInstance, source);
+                       serialize(tmp, target);
+               }
+               finally {
+                       if (CONCURRENT_ACCESS_CHECK) {
+                               exitExclusiveThread();
+                       }
+               }
        }
        
        // 
--------------------------------------------------------------------------------------------
@@ -516,6 +572,27 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
        // For testing
        // 
--------------------------------------------------------------------------------------------
 
+       private void enterExclusiveThread() {
+               // we use simple get, check, set here, rather than CAS
+               // we don't need lock-style correctness, this is only a 
sanity-check and we thus
+               // favor speed at the cost of some false negatives in this check
+               Thread previous = currentThread;
+               Thread thisThread = Thread.currentThread();
+
+               if (previous == null) {
+                       currentThread = thisThread;
+               }
+               else if (previous != thisThread) {
+                       throw new IllegalStateException(
+                                       "Concurrent access to KryoSerializer. 
Thread 1: " + thisThread.getName() +
+                                                       " , Thread 2: " + 
previous.getName());
+               }
+       }
+
+       private void exitExclusiveThread() {
+               currentThread = null;
+       }
+
        @VisibleForTesting
        public Kryo getKryo() {
                checkKryoInitialized();

http://git-wip-us.apache.org/repos/asf/flink/blob/57ff6e89/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerDebugInitHelper.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerDebugInitHelper.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerDebugInitHelper.java
new file mode 100644
index 0000000..ac918d6
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerDebugInitHelper.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Simple helper class to initialize the concurrency checks for tests.
+ *
+ * <p>The flag is automatically set to true when assertions are activated 
(tests)
+ * and can be set to true manually in other tests as well;
+ */
+@Internal
+class KryoSerializerDebugInitHelper {
+
+       /** This captures the initial setting after initialization. It is used 
to
+        * validate in tests that we never change the default to true. */
+       static final boolean INITIAL_SETTING;
+
+       /** The flag that is used to initialize the KryoSerializer's 
concurrency check flag. */
+       static boolean setToDebug = false;
+
+       static {
+               // capture the default setting, for tests
+               INITIAL_SETTING = setToDebug;
+
+               // if assertions are active, the check should be activated
+               //noinspection AssertWithSideEffects,ConstantConditions
+               assert setToDebug = true;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57ff6e89/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerConcurrencyCheckInactiveITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerConcurrencyCheckInactiveITCase.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerConcurrencyCheckInactiveITCase.java
new file mode 100644
index 0000000..522bf9e
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerConcurrencyCheckInactiveITCase.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * A test that validates that the concurrency checks in the Kryo Serializer
+ * are not hard coded to active.
+ *
+ * <p>The debug initialization in the KryoSerializer happens together with 
class
+ * initialization (that makes it peak efficient), which is why this test needs 
to
+ * run in a fresh JVM fork, and the JVM fork of this test should not be reused.
+ *
+ * <p><b>Important:</b> If you see this test fail and the initial settings are 
still
+ * correct, check the assumptions above (on fresh JVM fork).
+ */
+public class KryoSerializerConcurrencyCheckInactiveITCase {
+
+       // this sets the debug initialization back to its default, even if
+       // by default tests modify it (implicitly via assertion loading)
+       static {
+               KryoSerializerDebugInitHelper.setToDebug = 
KryoSerializerDebugInitHelper.INITIAL_SETTING;
+       }
+
+       /**
+        * This test checks that concurrent access is not detected by default, 
meaning that
+        * the thread concurrency checks are off by default.
+        */
+       @Test
+       public void testWithNoConcurrencyCheck() throws Exception {
+               boolean assertionError;
+               try {
+                       new 
KryoSerializerConcurrencyTest().testConcurrentUseOfSerializer();
+                       assertionError = false;
+               }
+               catch (AssertionError e) {
+                       assertionError = true;
+               }
+
+               assertTrue("testConcurrentUseOfSerializer() should have failed 
if " +
+                               "concurrency checks are off by default", 
assertionError);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57ff6e89/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerConcurrencyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerConcurrencyTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerConcurrencyTest.java
new file mode 100644
index 0000000..ca81fd4
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerConcurrencyTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.testutils.BlockerSync;
+import org.apache.flink.core.testutils.CheckedThread;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.fail;
+
+/**
+ * This tests that the {@link KryoSerializer} properly fails when accessed by 
two threads
+ * concurrently.
+ *
+ * <p><b>Important:</b> This test only works if assertions are activated (-ea) 
on the JVM
+ * when running tests.
+ */
+public class KryoSerializerConcurrencyTest {
+
+       @Test
+       public void testConcurrentUseOfSerializer() throws Exception {
+               final KryoSerializer<String> serializer = new 
KryoSerializer<>(String.class, new ExecutionConfig());
+
+               final BlockerSync sync = new BlockerSync();
+
+               final DataOutputView regularOut = new DataOutputSerializer(32);
+               final DataOutputView lockingOut = new LockingView(sync);
+
+               // this thread serializes and gets stuck there
+               final CheckedThread thread = new CheckedThread("serializer") {
+                       @Override
+                       public void go() throws Exception {
+                               serializer.serialize("a value", lockingOut);
+                       }
+               };
+
+               thread.start();
+               sync.awaitBlocker();
+
+               // this should fail with an exception
+               try {
+                       serializer.serialize("value", regularOut);
+                       fail("should have failed with an exception");
+               }
+               catch (IllegalStateException e) {
+                       // expected
+               }
+               finally {
+                       // release the thread that serializes
+                       sync.releaseBlocker();
+               }
+
+               // this propagates exceptions from the spawned thread
+               thread.sync();
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static class LockingView extends DataOutputSerializer {
+
+               private final BlockerSync blocker;
+
+               LockingView(BlockerSync blocker) {
+                       super(32);
+                       this.blocker = blocker;
+               }
+
+               @Override
+               public void write(byte[] b, int off, int len) throws 
IOException {
+                       blocker.blockNonInterruptible();
+               }
+       }
+}

Reply via email to