[FLINK-8879] [avro] Add concurrency check Avro Serializer on DEBUG level.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/be7c8959 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/be7c8959 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/be7c8959 Branch: refs/heads/master Commit: be7c89596a3b9cd8805a90aaf32336ec2759a1f7 Parents: 57ff6e8 Author: Stephan Ewen <se...@apache.org> Authored: Tue Mar 6 11:21:08 2018 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Mar 7 19:35:29 2018 +0100 ---------------------------------------------------------------------- .../formats/avro/typeutils/AvroSerializer.java | 114 ++++++++++++++++--- .../AvroSerializerDebugInitHelper.java | 47 ++++++++ ...erializerConcurrencyCheckInactiveITCase.java | 62 ++++++++++ .../AvroSerializerConcurrencyTest.java | 94 +++++++++++++++ .../AvroSerializerSerializabilityTest.java | 70 ++++++++++++ .../flink-1.4-serializer-java-serialized | Bin 0 -> 202 bytes 6 files changed, 374 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/be7c8959/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java index bc3369f..75f2988 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java @@ -39,6 +39,8 @@ import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.avro.specific.SpecificRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; @@ -52,12 +54,24 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * (ReflectDatumReader / -Writer). The serializer instantiates them depending on * the class of the type it should serialize. * + * <p><b>Important:</b> This serializer is NOT THREAD SAFE, because it reuses the data encoders + * and decoders which have buffers that would be shared between the threads if used concurrently + * * @param <T> The type to be serialized. */ public class AvroSerializer<T> extends TypeSerializer<T> { private static final long serialVersionUID = 1L; + /** Logger instance. */ + private static final Logger LOG = LoggerFactory.getLogger(AvroSerializer.class); + + /** Flag whether to check for concurrent thread access. + * Because this flag is static final, a value of 'false' allows the JIT compiler to eliminate + * the guarded code sections. */ + private static final boolean CONCURRENT_ACCESS_CHECK = + LOG.isDebugEnabled() || AvroSerializerDebugInitHelper.setToDebug; + // -------- configuration fields, serializable ----------- /** The class of the type that is serialized by this serializer. */ @@ -78,6 +92,9 @@ public class AvroSerializer<T> extends TypeSerializer<T> { /** The serializer configuration snapshot, cached for efficiency. */ private transient AvroSchemaSerializerConfigSnapshot configSnapshot; + /** The currently accessing thread, set and checked on debug level only. */ + private transient volatile Thread currentThread; + // ------------------------------------------------------------------------ /** @@ -127,23 +144,56 @@ public class AvroSerializer<T> extends TypeSerializer<T> { @Override public void serialize(T value, DataOutputView target) throws IOException { - checkAvroInitialized(); - this.encoder.setOut(target); - this.writer.write(value, this.encoder); + if (CONCURRENT_ACCESS_CHECK) { + enterExclusiveThread(); + } + + try { + checkAvroInitialized(); + this.encoder.setOut(target); + this.writer.write(value, this.encoder); + } + finally { + if (CONCURRENT_ACCESS_CHECK) { + exitExclusiveThread(); + } + } } @Override public T deserialize(DataInputView source) throws IOException { - checkAvroInitialized(); - this.decoder.setIn(source); - return this.reader.read(null, this.decoder); + if (CONCURRENT_ACCESS_CHECK) { + enterExclusiveThread(); + } + + try { + checkAvroInitialized(); + this.decoder.setIn(source); + return this.reader.read(null, this.decoder); + } + finally { + if (CONCURRENT_ACCESS_CHECK) { + exitExclusiveThread(); + } + } } @Override public T deserialize(T reuse, DataInputView source) throws IOException { - checkAvroInitialized(); - this.decoder.setIn(source); - return this.reader.read(reuse, this.decoder); + if (CONCURRENT_ACCESS_CHECK) { + enterExclusiveThread(); + } + + try { + checkAvroInitialized(); + this.decoder.setIn(source); + return this.reader.read(reuse, this.decoder); + } + finally { + if (CONCURRENT_ACCESS_CHECK) { + exitExclusiveThread(); + } + } } // ------------------------------------------------------------------------ @@ -152,8 +202,19 @@ public class AvroSerializer<T> extends TypeSerializer<T> { @Override public T copy(T from) { - checkAvroInitialized(); - return avroData.deepCopy(schema, from); + if (CONCURRENT_ACCESS_CHECK) { + enterExclusiveThread(); + } + + try { + checkAvroInitialized(); + return avroData.deepCopy(schema, from); + } + finally { + if (CONCURRENT_ACCESS_CHECK) { + exitExclusiveThread(); + } + } } @Override @@ -163,8 +224,10 @@ public class AvroSerializer<T> extends TypeSerializer<T> { @Override public void copy(DataInputView source, DataOutputView target) throws IOException { - T value = deserialize(source); - serialize(value, target); + // we do not have concurrency checks here, because serialize() and + // deserialize() do the checks and the current concurrency check mechanism + // does provide additional safety in cases of re-entrant calls + serialize(deserialize(source), target); } // ------------------------------------------------------------------------ @@ -277,6 +340,31 @@ public class AvroSerializer<T> extends TypeSerializer<T> { this.decoder = new DataInputDecoder(); } + // -------------------------------------------------------------------------------------------- + // Concurrency checks + // -------------------------------------------------------------------------------------------- + + private void enterExclusiveThread() { + // we use simple get, check, set here, rather than CAS + // we don't need lock-style correctness, this is only a sanity-check and we thus + // favor speed at the cost of some false negatives in this check + Thread previous = currentThread; + Thread thisThread = Thread.currentThread(); + + if (previous == null) { + currentThread = thisThread; + } + else if (previous != thisThread) { + throw new IllegalStateException( + "Concurrent access to KryoSerializer. Thread 1: " + thisThread.getName() + + " , Thread 2: " + previous.getName()); + } + } + + private void exitExclusiveThread() { + currentThread = null; + } + // ------------------------------------------------------------------------ // Serializer Snapshots // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/be7c8959/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerDebugInitHelper.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerDebugInitHelper.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerDebugInitHelper.java new file mode 100644 index 0000000..c657092 --- /dev/null +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerDebugInitHelper.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.typeutils; + +import org.apache.flink.annotation.Internal; + +/** + * Simple helper class to initialize the concurrency checks for tests. + * + * <p>The flag is automatically set to true when assertions are activated (tests) + * and can be set to true manually in other tests as well; + */ +@Internal +class AvroSerializerDebugInitHelper { + + /** This captures the initial setting after initialization. It is used to + * validate in tests that we never change the default to true. */ + static final boolean INITIAL_SETTING; + + /** The flag that is used to initialize the KryoSerializer's concurrency check flag. */ + static boolean setToDebug = false; + + static { + // capture the default setting, for tests + INITIAL_SETTING = setToDebug; + + // if assertions are active, the check should be activated + //noinspection AssertWithSideEffects,ConstantConditions + assert setToDebug = true; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/be7c8959/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyCheckInactiveITCase.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyCheckInactiveITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyCheckInactiveITCase.java new file mode 100644 index 0000000..9b98e44 --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyCheckInactiveITCase.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.typeutils; + +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + +/** + * A test that validates that the concurrency checks in the Avro Serializer + * are not hard coded to active. + * + * <p>The debug initialization in the AvroSerializer happens together with class + * initialization (that makes it peak efficient), which is why this test needs to + * run in a fresh JVM fork, and the JVM fork of this test should not be reused. + * + * <p><b>Important:</b> If you see this test fail and the initial settings are still + * correct, check the assumptions above (on fresh JVM fork). + */ +public class AvroSerializerConcurrencyCheckInactiveITCase { + + // this sets the debug initialization back to its default, even if + // by default tests modify it (implicitly via assertion loading) + static { + AvroSerializerDebugInitHelper.setToDebug = AvroSerializerDebugInitHelper.INITIAL_SETTING; + } + + /** + * This test checks that concurrent access is not detected by default, meaning that + * the thread concurrency checks are off by default. + */ + @Test + public void testWithNoConcurrencyCheck() throws Exception { + boolean assertionError; + try { + new AvroSerializerConcurrencyTest().testConcurrentUseOfSerializer(); + assertionError = false; + } + catch (AssertionError e) { + assertionError = true; + } + + assertTrue("testConcurrentUseOfSerializer() should have failed if " + + "concurrency checks are off by default", assertionError); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/be7c8959/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyTest.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyTest.java new file mode 100644 index 0000000..aaa9b4b --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.typeutils; + +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.testutils.BlockerSync; +import org.apache.flink.core.testutils.CheckedThread; + +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.fail; + +/** + * This tests that the {@link AvroSerializer} properly fails when accessed by two threads + * concurrently. + * + * <p><b>Important:</b> This test only works if assertions are activated (-ea) on the JVM + * when running tests. + */ +public class AvroSerializerConcurrencyTest { + + @Test + public void testConcurrentUseOfSerializer() throws Exception { + final AvroSerializer<String> serializer = new AvroSerializer<>(String.class); + + final BlockerSync sync = new BlockerSync(); + + final DataOutputView regularOut = new DataOutputSerializer(32); + final DataOutputView lockingOut = new LockingView(sync); + + // this thread serializes and gets stuck there + final CheckedThread thread = new CheckedThread("serializer") { + @Override + public void go() throws Exception { + serializer.serialize("a value", lockingOut); + } + }; + + thread.start(); + sync.awaitBlocker(); + + // this should fail with an exception + try { + serializer.serialize("value", regularOut); + fail("should have failed with an exception"); + } + catch (IllegalStateException e) { + // expected + } + finally { + // release the thread that serializes + sync.releaseBlocker(); + } + + // this propagates exceptions from the spawned thread + thread.sync(); + } + + // ------------------------------------------------------------------------ + + private static class LockingView extends DataOutputSerializer { + + private final BlockerSync blocker; + + LockingView(BlockerSync blocker) { + super(32); + this.blocker = blocker; + } + + @Override + public void writeInt(int v) throws IOException { + blocker.blockNonInterruptible(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/be7c8959/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSerializabilityTest.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSerializabilityTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSerializabilityTest.java new file mode 100644 index 0000000..c15aa7c --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSerializabilityTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.typeutils; + +import org.junit.Test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +import static org.junit.Assert.assertEquals; + +/** + * Test that validates that the serialized form of the AvroSerializer is the same as in + * previous Flink versions. + * + * <p>While that is not strictly necessary for FLink to work, it increases user experience + * in job upgrade situations. + */ +public class AvroSerializerSerializabilityTest { + + private static final String RESOURCE_NAME = "flink-1.4-serializer-java-serialized"; + + @Test + public void testDeserializeSerializer() throws Exception { + final AvroSerializer<String> currentSerializer = new AvroSerializer<>(String.class); + + try (ObjectInputStream in = new ObjectInputStream( + getClass().getClassLoader().getResourceAsStream(RESOURCE_NAME))) { + + @SuppressWarnings("unchecked") + AvroSerializer<String> deserialized = (AvroSerializer<String>) in.readObject(); + + assertEquals(currentSerializer, deserialized); + } + } + + // ------------------------------------------------------------------------ + // To create a serialized serializer file + // ------------------------------------------------------------------------ + + public static void main(String[] args) throws Exception { + final AvroSerializer<String> serializer = new AvroSerializer<>(String.class); + + final File file = new File("flink-formats/flink-avro/src/test/resources/" + RESOURCE_NAME).getAbsoluteFile(); + + try (FileOutputStream fos = new FileOutputStream(file); + ObjectOutputStream out = new ObjectOutputStream(fos)) { + + out.writeObject(serializer); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/be7c8959/flink-formats/flink-avro/src/test/resources/flink-1.4-serializer-java-serialized ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/resources/flink-1.4-serializer-java-serialized b/flink-formats/flink-avro/src/test/resources/flink-1.4-serializer-java-serialized new file mode 100644 index 0000000..63fef0a Binary files /dev/null and b/flink-formats/flink-avro/src/test/resources/flink-1.4-serializer-java-serialized differ