Repository: flink Updated Branches: refs/heads/master fe0c3b539 -> 2eb2a0ef3
[FLINK-3338] [kafka] Use proper classloader when cloning the deserialization schema. This closes #1590 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2eb2a0ef Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2eb2a0ef Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2eb2a0ef Branch: refs/heads/master Commit: 2eb2a0ef352f75a65a45a5a247450ae61ae5ab17 Parents: fe0c3b5 Author: Stephan Ewen <se...@apache.org> Authored: Thu Feb 4 21:14:39 2016 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Feb 5 14:07:42 2016 +0100 ---------------------------------------------------------------------- .../apache/flink/util/InstantiationUtil.java | 29 ++++++++++++++++++-- .../connectors/kafka/FlinkKafkaConsumer08.java | 3 +- .../kafka/internals/LegacyFetcher.java | 26 ++++++++++++------ .../kafka/testutils/MockRuntimeContext.java | 2 +- 4 files changed, 48 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2eb2a0ef/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java index 1c6896f..e2439ca 100644 --- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java @@ -310,8 +310,33 @@ public final class InstantiationUtil { * @throws ClassNotFoundException */ public static <T extends Serializable> T clone(T obj) throws IOException, ClassNotFoundException { - final byte[] serializedObject = serializeObject(obj); - return deserializeObject(serializedObject, obj.getClass().getClassLoader()); + if (obj == null) { + return null; + } else { + return clone(obj, obj.getClass().getClassLoader()); + } + } + + /** + * Clones the given serializable object using Java serialization, using the given classloader to + * resolve the cloned classes. + * + * @param obj Object to clone + * @param classLoader The classloader to resolve the classes during deserialization. + * @param <T> Type of the object to clone + * + * @return Cloned object + * + * @throws IOException + * @throws ClassNotFoundException + */ + public static <T extends Serializable> T clone(T obj, ClassLoader classLoader) throws IOException, ClassNotFoundException { + if (obj == null) { + return null; + } else { + final byte[] serializedObject = serializeObject(obj); + return deserializeObject(serializedObject, classLoader); + } } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2eb2a0ef/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java index 543e0ff..bdea37f 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java @@ -251,7 +251,8 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> { } // create fetcher - fetcher = new LegacyFetcher(this.subscribedPartitions, props, getRuntimeContext().getTaskName()); + fetcher = new LegacyFetcher(this.subscribedPartitions, props, + getRuntimeContext().getTaskName(), getRuntimeContext().getUserCodeClassLoader()); // offset handling offsetHandler = new ZookeeperOffsetHandler(props); http://git-wip-us.apache.org/repos/asf/flink/blob/2eb2a0ef/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java index 164cbac..fe7f777 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java @@ -31,11 +31,12 @@ import kafka.message.MessageAndOffset; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; - import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.StringUtils; + import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.Node; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +50,7 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.atomic.AtomicReference; -import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.requireNonNull; /** * This fetcher uses Kafka's low-level API to pull data from a specific @@ -70,6 +71,9 @@ public class LegacyFetcher implements Fetcher { /** The first error that occurred in a connection thread */ private final AtomicReference<Throwable> error; + + /** The classloader for dynamically loaded classes */ + private final ClassLoader userCodeClassloader; /** The partitions that the fetcher should read, with their starting offsets */ private Map<KafkaTopicPartitionLeader, Long> partitionsToRead; @@ -86,8 +90,13 @@ public class LegacyFetcher implements Fetcher { /** Flag to shot the fetcher down */ private volatile boolean running = true; - public LegacyFetcher(List<KafkaTopicPartitionLeader> partitions, Properties props, String taskName) { - this.config = checkNotNull(props, "The config properties cannot be null"); + public LegacyFetcher( + List<KafkaTopicPartitionLeader> partitions, Properties props, + String taskName, ClassLoader userCodeClassloader) { + + this.config = requireNonNull(props, "The config properties cannot be null"); + this.userCodeClassloader = requireNonNull(userCodeClassloader); + //this.topic = checkNotNull(topic, "The topic cannot be null"); this.partitionsToRead = new HashMap<>(); for (KafkaTopicPartitionLeader p: partitions) { @@ -200,7 +209,8 @@ public class LegacyFetcher implements Fetcher { FetchPartition[] partitions = partitionsList.toArray(new FetchPartition[partitionsList.size()]); - final KeyedDeserializationSchema<T> clonedDeserializer = InstantiationUtil.clone(deserializer); + final KeyedDeserializationSchema<T> clonedDeserializer = + InstantiationUtil.clone(deserializer, userCodeClassloader); SimpleConsumerThread<T> thread = new SimpleConsumerThread<>(this, config, broker, partitions, sourceContext, clonedDeserializer, lastOffsets); @@ -344,9 +354,9 @@ public class LegacyFetcher implements Fetcher { this.config = config; this.broker = broker; this.partitions = partitions; - this.sourceContext = checkNotNull(sourceContext); - this.deserializer = checkNotNull(deserializer); - this.offsetsState = checkNotNull(offsetsState); + this.sourceContext = requireNonNull(sourceContext); + this.deserializer = requireNonNull(deserializer); + this.offsetsState = requireNonNull(offsetsState); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/2eb2a0ef/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java index cd44236..17e2e6f 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java @@ -98,7 +98,7 @@ public class MockRuntimeContext extends StreamingRuntimeContext { @Override public ClassLoader getUserCodeClassLoader() { - throw new UnsupportedOperationException(); + return getClass().getClassLoader(); } @Override