[FLINK-8879] [avro] Add concurrency check Avro Serializer on DEBUG level.

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/be7c8959
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/be7c8959
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/be7c8959

Branch: refs/heads/master
Commit: be7c89596a3b9cd8805a90aaf32336ec2759a1f7
Parents: 57ff6e8
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Mar 6 11:21:08 2018 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Mar 7 19:35:29 2018 +0100

----------------------------------------------------------------------
 .../formats/avro/typeutils/AvroSerializer.java  | 114 ++++++++++++++++---
 .../AvroSerializerDebugInitHelper.java          |  47 ++++++++
 ...erializerConcurrencyCheckInactiveITCase.java |  62 ++++++++++
 .../AvroSerializerConcurrencyTest.java          |  94 +++++++++++++++
 .../AvroSerializerSerializabilityTest.java      |  70 ++++++++++++
 .../flink-1.4-serializer-java-serialized        | Bin 0 -> 202 bytes
 6 files changed, 374 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/be7c8959/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
index bc3369f..75f2988 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
@@ -39,6 +39,8 @@ import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.specific.SpecificRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
@@ -52,12 +54,24 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * (ReflectDatumReader / -Writer). The serializer instantiates them depending 
on
  * the class of the type it should serialize.
  *
+ * <p><b>Important:</b> This serializer is NOT THREAD SAFE, because it reuses 
the data encoders
+ * and decoders which have buffers that would be shared between the threads if 
used concurrently
+ *
  * @param <T> The type to be serialized.
  */
 public class AvroSerializer<T> extends TypeSerializer<T> {
 
        private static final long serialVersionUID = 1L;
 
+       /** Logger instance. */
+       private static final Logger LOG = 
LoggerFactory.getLogger(AvroSerializer.class);
+
+       /** Flag whether to check for concurrent thread access.
+        * Because this flag is static final, a value of 'false' allows the JIT 
compiler to eliminate
+        * the guarded code sections. */
+       private static final boolean CONCURRENT_ACCESS_CHECK =
+                       LOG.isDebugEnabled() || 
AvroSerializerDebugInitHelper.setToDebug;
+
        // -------- configuration fields, serializable -----------
 
        /** The class of the type that is serialized by this serializer. */
@@ -78,6 +92,9 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
        /** The serializer configuration snapshot, cached for efficiency. */
        private transient AvroSchemaSerializerConfigSnapshot configSnapshot;
 
+       /** The currently accessing thread, set and checked on debug level 
only. */
+       private transient volatile Thread currentThread;
+
        // 
------------------------------------------------------------------------
 
        /**
@@ -127,23 +144,56 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
 
        @Override
        public void serialize(T value, DataOutputView target) throws 
IOException {
-               checkAvroInitialized();
-               this.encoder.setOut(target);
-               this.writer.write(value, this.encoder);
+               if (CONCURRENT_ACCESS_CHECK) {
+                       enterExclusiveThread();
+               }
+
+               try {
+                       checkAvroInitialized();
+                       this.encoder.setOut(target);
+                       this.writer.write(value, this.encoder);
+               }
+               finally {
+                       if (CONCURRENT_ACCESS_CHECK) {
+                               exitExclusiveThread();
+                       }
+               }
        }
 
        @Override
        public T deserialize(DataInputView source) throws IOException {
-               checkAvroInitialized();
-               this.decoder.setIn(source);
-               return this.reader.read(null, this.decoder);
+               if (CONCURRENT_ACCESS_CHECK) {
+                       enterExclusiveThread();
+               }
+
+               try {
+                       checkAvroInitialized();
+                       this.decoder.setIn(source);
+                       return this.reader.read(null, this.decoder);
+               }
+               finally {
+                       if (CONCURRENT_ACCESS_CHECK) {
+                               exitExclusiveThread();
+                       }
+               }
        }
 
        @Override
        public T deserialize(T reuse, DataInputView source) throws IOException {
-               checkAvroInitialized();
-               this.decoder.setIn(source);
-               return this.reader.read(reuse, this.decoder);
+               if (CONCURRENT_ACCESS_CHECK) {
+                       enterExclusiveThread();
+               }
+
+               try {
+                       checkAvroInitialized();
+                       this.decoder.setIn(source);
+                       return this.reader.read(reuse, this.decoder);
+               }
+               finally {
+                       if (CONCURRENT_ACCESS_CHECK) {
+                               exitExclusiveThread();
+                       }
+               }
        }
 
        // 
------------------------------------------------------------------------
@@ -152,8 +202,19 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
 
        @Override
        public T copy(T from) {
-               checkAvroInitialized();
-               return avroData.deepCopy(schema, from);
+               if (CONCURRENT_ACCESS_CHECK) {
+                       enterExclusiveThread();
+               }
+
+               try {
+                       checkAvroInitialized();
+                       return avroData.deepCopy(schema, from);
+               }
+               finally {
+                       if (CONCURRENT_ACCESS_CHECK) {
+                               exitExclusiveThread();
+                       }
+               }
        }
 
        @Override
@@ -163,8 +224,10 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
 
        @Override
        public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-               T value = deserialize(source);
-               serialize(value, target);
+               // we do not have concurrency checks here, because serialize() 
and
+               // deserialize() do the checks and the current concurrency 
check mechanism
+               // does provide additional safety in cases of re-entrant calls
+               serialize(deserialize(source), target);
        }
 
        // 
------------------------------------------------------------------------
@@ -277,6 +340,31 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
                this.decoder = new DataInputDecoder();
        }
 
+       // 
--------------------------------------------------------------------------------------------
+       //  Concurrency checks
+       // 
--------------------------------------------------------------------------------------------
+
+       private void enterExclusiveThread() {
+               // we use simple get, check, set here, rather than CAS
+               // we don't need lock-style correctness, this is only a 
sanity-check and we thus
+               // favor speed at the cost of some false negatives in this check
+               Thread previous = currentThread;
+               Thread thisThread = Thread.currentThread();
+
+               if (previous == null) {
+                       currentThread = thisThread;
+               }
+               else if (previous != thisThread) {
+                       throw new IllegalStateException(
+                                       "Concurrent access to KryoSerializer. 
Thread 1: " + thisThread.getName() +
+                                                       " , Thread 2: " + 
previous.getName());
+               }
+       }
+
+       private void exitExclusiveThread() {
+               currentThread = null;
+       }
+
        // 
------------------------------------------------------------------------
        //  Serializer Snapshots
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/be7c8959/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerDebugInitHelper.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerDebugInitHelper.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerDebugInitHelper.java
new file mode 100644
index 0000000..c657092
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerDebugInitHelper.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Simple helper class to initialize the concurrency checks for tests.
+ *
+ * <p>The flag is automatically set to true when assertions are activated 
(tests)
+ * and can be set to true manually in other tests as well;
+ */
+@Internal
+class AvroSerializerDebugInitHelper {
+
+       /** This captures the initial setting after initialization. It is used 
to
+        * validate in tests that we never change the default to true. */
+       static final boolean INITIAL_SETTING;
+
+       /** The flag that is used to initialize the KryoSerializer's 
concurrency check flag. */
+       static boolean setToDebug = false;
+
+       static {
+               // capture the default setting, for tests
+               INITIAL_SETTING = setToDebug;
+
+               // if assertions are active, the check should be activated
+               //noinspection AssertWithSideEffects,ConstantConditions
+               assert setToDebug = true;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/be7c8959/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyCheckInactiveITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyCheckInactiveITCase.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyCheckInactiveITCase.java
new file mode 100644
index 0000000..9b98e44
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyCheckInactiveITCase.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * A test that validates that the concurrency checks in the Avro Serializer
+ * are not hard coded to active.
+ *
+ * <p>The debug initialization in the AvroSerializer happens together with 
class
+ * initialization (that makes it peak efficient), which is why this test needs 
to
+ * run in a fresh JVM fork, and the JVM fork of this test should not be reused.
+ *
+ * <p><b>Important:</b> If you see this test fail and the initial settings are 
still
+ * correct, check the assumptions above (on fresh JVM fork).
+ */
+public class AvroSerializerConcurrencyCheckInactiveITCase {
+
+       // this sets the debug initialization back to its default, even if
+       // by default tests modify it (implicitly via assertion loading)
+       static {
+               AvroSerializerDebugInitHelper.setToDebug = 
AvroSerializerDebugInitHelper.INITIAL_SETTING;
+       }
+
+       /**
+        * This test checks that concurrent access is not detected by default, 
meaning that
+        * the thread concurrency checks are off by default.
+        */
+       @Test
+       public void testWithNoConcurrencyCheck() throws Exception {
+               boolean assertionError;
+               try {
+                       new 
AvroSerializerConcurrencyTest().testConcurrentUseOfSerializer();
+                       assertionError = false;
+               }
+               catch (AssertionError e) {
+                       assertionError = true;
+               }
+
+               assertTrue("testConcurrentUseOfSerializer() should have failed 
if " +
+                               "concurrency checks are off by default", 
assertionError);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/be7c8959/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyTest.java
new file mode 100644
index 0000000..aaa9b4b
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.testutils.BlockerSync;
+import org.apache.flink.core.testutils.CheckedThread;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.fail;
+
+/**
+ * This tests that the {@link AvroSerializer} properly fails when accessed by 
two threads
+ * concurrently.
+ *
+ * <p><b>Important:</b> This test only works if assertions are activated (-ea) 
on the JVM
+ * when running tests.
+ */
+public class AvroSerializerConcurrencyTest {
+
+       @Test
+       public void testConcurrentUseOfSerializer() throws Exception {
+               final AvroSerializer<String> serializer = new 
AvroSerializer<>(String.class);
+
+               final BlockerSync sync = new BlockerSync();
+
+               final DataOutputView regularOut = new DataOutputSerializer(32);
+               final DataOutputView lockingOut = new LockingView(sync);
+
+               // this thread serializes and gets stuck there
+               final CheckedThread thread = new CheckedThread("serializer") {
+                       @Override
+                       public void go() throws Exception {
+                               serializer.serialize("a value", lockingOut);
+                       }
+               };
+
+               thread.start();
+               sync.awaitBlocker();
+
+               // this should fail with an exception
+               try {
+                       serializer.serialize("value", regularOut);
+                       fail("should have failed with an exception");
+               }
+               catch (IllegalStateException e) {
+                       // expected
+               }
+               finally {
+                       // release the thread that serializes
+                       sync.releaseBlocker();
+               }
+
+               // this propagates exceptions from the spawned thread
+               thread.sync();
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static class LockingView extends DataOutputSerializer {
+
+               private final BlockerSync blocker;
+
+               LockingView(BlockerSync blocker) {
+                       super(32);
+                       this.blocker = blocker;
+               }
+
+               @Override
+               public void writeInt(int v) throws IOException {
+                       blocker.blockNonInterruptible();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/be7c8959/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSerializabilityTest.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSerializabilityTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSerializabilityTest.java
new file mode 100644
index 0000000..c15aa7c
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSerializabilityTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test that validates that the serialized form of the AvroSerializer is the 
same as in
+ * previous Flink versions.
+ *
+ * <p>While that is not strictly necessary for FLink to work, it increases 
user experience
+ * in job upgrade situations.
+ */
+public class AvroSerializerSerializabilityTest {
+
+       private static final String RESOURCE_NAME = 
"flink-1.4-serializer-java-serialized";
+
+       @Test
+       public void testDeserializeSerializer() throws Exception {
+               final AvroSerializer<String> currentSerializer = new 
AvroSerializer<>(String.class);
+
+               try (ObjectInputStream in = new ObjectInputStream(
+                               
getClass().getClassLoader().getResourceAsStream(RESOURCE_NAME))) {
+
+                       @SuppressWarnings("unchecked")
+                       AvroSerializer<String> deserialized = 
(AvroSerializer<String>) in.readObject();
+
+                       assertEquals(currentSerializer, deserialized);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  To create a serialized serializer file
+       // 
------------------------------------------------------------------------
+
+       public static void main(String[] args) throws Exception {
+               final AvroSerializer<String> serializer = new 
AvroSerializer<>(String.class);
+
+               final File file = new 
File("flink-formats/flink-avro/src/test/resources/" + 
RESOURCE_NAME).getAbsoluteFile();
+
+               try (FileOutputStream fos = new FileOutputStream(file);
+                               ObjectOutputStream out = new 
ObjectOutputStream(fos)) {
+
+                       out.writeObject(serializer);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/be7c8959/flink-formats/flink-avro/src/test/resources/flink-1.4-serializer-java-serialized
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/resources/flink-1.4-serializer-java-serialized
 
b/flink-formats/flink-avro/src/test/resources/flink-1.4-serializer-java-serialized
new file mode 100644
index 0000000..63fef0a
Binary files /dev/null and 
b/flink-formats/flink-avro/src/test/resources/flink-1.4-serializer-java-serialized
 differ

Reply via email to