Repository: incubator-samza Updated Branches: refs/heads/master 7d1a01c7b -> e0ce7ec00
SAMZA-59: support dropping messages on Serde failures Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/e0ce7ec0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/e0ce7ec0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/e0ce7ec0 Branch: refs/heads/master Commit: e0ce7ec00aa3aa5caae7ac8408bc7491b68cb438 Parents: 7d1a01c Author: Yan Fang <[email protected]> Authored: Mon Jul 14 09:41:55 2014 -0700 Committer: Yan Fang <[email protected]> Committed: Mon Jul 14 09:41:55 2014 -0700 ---------------------------------------------------------------------- .../0.7.0/jobs/configuration-table.html | 20 ++++++ .../org/apache/samza/config/TaskConfig.scala | 6 ++ .../apache/samza/container/SamzaContainer.scala | 21 ++++-- .../apache/samza/system/SystemConsumers.scala | 24 ++++++- .../samza/system/SystemConsumersMetrics.scala | 1 + .../apache/samza/system/SystemProducers.scala | 25 ++++++- .../samza/system/SystemProducersMetrics.scala | 1 + .../samza/system/TestSystemConsumers.scala | 74 ++++++++++++++++++-- .../samza/system/TestSystemProducers.scala | 69 ++++++++++++++++++ 9 files changed, 227 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e0ce7ec0/docs/learn/documentation/0.7.0/jobs/configuration-table.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/0.7.0/jobs/configuration-table.html b/docs/learn/documentation/0.7.0/jobs/configuration-table.html index 07cb83f..edcb74f 100644 --- a/docs/learn/documentation/0.7.0/jobs/configuration-table.html +++ b/docs/learn/documentation/0.7.0/jobs/configuration-table.html @@ -357,6 +357,26 @@ </tr> <tr> + <td class="property" id="task-drop-deserialization-errors">task.drop.deserialization.errors</td> + <td class="default"></td> + <td class="description"> + This property is to define how the system deals with deserialization failure situation. If set to true, the system will + skip the error messages and keep running. If set to false, the system with throw exceptions and fail the container. Default + is false. + </td> + </tr> + + <tr> + <td class="property" id="task-drop-serialization-errors">task.drop.serialization.errors</td> + <td class="default"></td> + <td class="description"> + This property is to define how the system deals with serialization failure situation. If set to true, the system will + drop the error messages and keep running. If set to false, the system with throw exceptions and fail the container. Default + is false. + </td> + </tr> + + <tr> <th colspan="3" class="section" id="streams"><a href="../container/streams.html">Systems (input and output streams)</a></th> </tr> http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e0ce7ec0/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala index 18a9510..8b881f2 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala @@ -33,6 +33,8 @@ object TaskConfig { val LIFECYCLE_LISTENER = "task.lifecycle.listener.%s.class" // task.lifecycle.listener.li-generator.class val CHECKPOINT_MANAGER_FACTORY = "task.checkpoint.factory" // class name to use when sending offset checkpoints val MESSAGE_CHOOSER_CLASS_NAME = "task.chooser.class" + val DROP_DESERIALIZATION_ERROR = "task.drop.deserialization.errors" // define whether drop the messages or not when deserialization fails + val DROP_SERIALIZATION_ERROR = "task.drop.serialization.errors" // define whether drop the messages or not when serialization fails implicit def Config2Task(config: Config) = new TaskConfig(config) } @@ -70,4 +72,8 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) { def getCheckpointManagerFactory() = getOption(TaskConfig.CHECKPOINT_MANAGER_FACTORY) def getMessageChooserClass = getOption(TaskConfig.MESSAGE_CHOOSER_CLASS_NAME) + + def getDropDeserialization = getOption(TaskConfig.DROP_DESERIALIZATION_ERROR) + + def getDropSerialization = getOption(TaskConfig.DROP_SERIALIZATION_ERROR) } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e0ce7ec0/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 356adbb..b303615 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -87,7 +87,7 @@ object SamzaContainer extends Logging { def apply(containerName: String, inputStreams: Set[SystemStreamPartition], config: Config) = { val containerPID = Util.getContainerPID - + info("Setting up Samza container: %s" format containerName) info("Samza container PID: %s" format containerPID) info("Using streams and partitions: %s" format inputStreams) @@ -282,17 +282,29 @@ object SamzaContainer extends Logging { info("Got offset manager: %s" format offsetManager) + val dropDeserializationError: Boolean = config.getDropDeserialization match { + case Some(dropError) => dropError.toBoolean + case _ => false + } + + val dropSerializationError: Boolean = config.getDropSerialization match { + case Some(dropError) => dropError.toBoolean + case _ => false + } + val consumerMultiplexer = new SystemConsumers( // TODO add config values for no new message timeout and max msgs per stream partition chooser = chooser, consumers = consumers, serdeManager = serdeManager, - metrics = systemConsumersMetrics) + metrics = systemConsumersMetrics, + dropDeserializationError = dropDeserializationError) val producerMultiplexer = new SystemProducers( producers = producers, serdeManager = serdeManager, - metrics = systemProducersMetrics) + metrics = systemProducersMetrics, + dropSerializationError = dropSerializationError) val listeners = config.getLifecycleListeners match { case Some(listeners) => { @@ -438,8 +450,7 @@ object SamzaContainer extends Logging { consumerMultiplexer = consumerMultiplexer, metrics = samzaContainerMetrics, windowMs = taskWindowMs, - commitMs = taskCommitMs - ) + commitMs = taskCommitMs) info("Samza container setup complete.") http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e0ce7ec0/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala index b537046..9eb70f2 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala @@ -70,7 +70,14 @@ class SystemConsumers( * thread will sit in a tight loop polling every SystemConsumer over and * over again if no new messages are available. */ - noNewMessagesTimeout: Long = 10) extends Logging { + noNewMessagesTimeout: Long = 10, + + /** + * This parameter is to define how to deal with deserialization failure. If set to true, + * the task will skip the messages when deserialization fails. If set to false, the task + * will throw SamzaException and fail the container. + */ + dropDeserializationError: Boolean = false) extends Logging { /** * The buffer where SystemConsumers stores all incoming message envelopes. @@ -242,7 +249,20 @@ class SystemConsumers( incomingEnvelopes.foreach(envelope => { val systemStreamPartition = envelope.getSystemStreamPartition - buffer.update(serdeManager.fromBytes(envelope)) + val messageEnvelope = try { + Some(serdeManager.fromBytes(envelope)) + } catch { + case e: Exception if !dropDeserializationError => throw new SystemConsumersException("can not deserialize the message", e) + case ex: Throwable => { + debug("Deserialization fails: %s . Drop the error message" format ex) + metrics.deserializationError.inc + None + } + } + + if (!messageEnvelope.isEmpty) { + buffer.update(messageEnvelope.get) + } debug("Got message for: %s, %s" format (systemStreamPartition, envelope)) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e0ce7ec0/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala index d632314..b065ae6 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala @@ -27,6 +27,7 @@ import org.apache.samza.metrics.MetricsHelper class SystemConsumersMetrics(val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper { val choseNull = newCounter("chose-null") val choseObject = newCounter("chose-object") + val deserializationError = newCounter("deserialization error") val systemPolls = scala.collection.mutable.Map[String, Counter]() val systemStreamPartitionFetchesPerPoll = scala.collection.mutable.Map[String, Counter]() val systemMessagesPerPoll = scala.collection.mutable.Map[String, Counter]() http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e0ce7ec0/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala index 8fb36b3..928b47e 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala @@ -21,11 +21,19 @@ package org.apache.samza.system import org.apache.samza.serializers.SerdeManager import grizzled.slf4j.Logging +import org.apache.samza.SamzaException class SystemProducers( producers: Map[String, SystemProducer], serdeManager: SerdeManager, - metrics: SystemProducersMetrics = new SystemProducersMetrics) extends Logging { + metrics: SystemProducersMetrics = new SystemProducersMetrics, + + /** + * If set to true, Samza will drop the messages that have serialization errors + * and keep running. If set to false, Samza will throw the SamzaException + * to fail the container. Default is false. + */ + dropSerializationError: Boolean = false) extends Logging { def start { debug("Starting producers.") @@ -62,6 +70,19 @@ class SystemProducers( metrics.sends.inc metrics.sourceSends(source).inc - producers(envelope.getSystemStream.getSystem).send(source, serdeManager.toBytes(envelope)) + val bytesEnvelope = try { + Some(serdeManager.toBytes(envelope)) + } catch { + case e: Exception if !dropSerializationError => throw new SamzaException("can not serialize the message", e) + case ex: Throwable => { + debug("Serialization fails: %s . Drop the error message" format ex) + metrics.serializationError.inc + None + } + } + + if (!bytesEnvelope.isEmpty) { + producers(envelope.getSystemStream.getSystem).send(source, bytesEnvelope.get) + } } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e0ce7ec0/samza-core/src/main/scala/org/apache/samza/system/SystemProducersMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemProducersMetrics.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemProducersMetrics.scala index 594dd51..49d9aef 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/SystemProducersMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/SystemProducersMetrics.scala @@ -29,6 +29,7 @@ class SystemProducersMetrics(val registry: MetricsRegistry = new MetricsRegistry val sends = newCounter("sends") val sourceFlushes = scala.collection.mutable.Map[String, Counter]() val sourceSends = scala.collection.mutable.Map[String, Counter]() + val serializationError = newCounter("serialization error") def registerSource(source: String) { sourceFlushes += source -> newCounter("%s-flushes" format source) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e0ce7ec0/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala index e1a4c15..97e65eb 100644 --- a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala +++ b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala @@ -24,6 +24,9 @@ import org.apache.samza.Partition import org.junit.Assert._ import org.junit.Test import org.apache.samza.system.chooser.MessageChooser +import org.apache.samza.system.chooser.DefaultChooser +import org.apache.samza.util.BlockingEnvelopeMap +import org.apache.samza.serializers._ class TestSystemConsumers { @Test @@ -68,7 +71,7 @@ class TestSystemConsumers { var started = 0 var stopped = 0 var registered = Map[SystemStreamPartition, String]() - + val consumer = Map(system -> new SystemConsumer { def start {} def stop {} @@ -82,15 +85,76 @@ class TestSystemConsumers { def stop = stopped += 1 def register(systemStreamPartition: SystemStreamPartition, offset: String) = registered += systemStreamPartition -> offset }, consumer, null) - + // it should throw a SystemConsumersException because system2 does not have a consumer var caughtRightException = false try { - consumers.register(systemStreamPartition2, "0") + consumers.register(systemStreamPartition2, "0") } catch { - case e: SystemConsumersException => caughtRightException = true - case _: Throwable => caughtRightException = false + case e: SystemConsumersException => caughtRightException = true + case _: Throwable => caughtRightException = false } assertTrue("suppose to throw SystemConsumersException, but apparently it did not", caughtRightException) } + + @Test + def testDroppingMsgOrThrowExceptionWhenSerdeFails() { + val system = "test-system" + val systemStreamPartition = new SystemStreamPartition(system, "some-stream", new Partition(1)) + val msgChooser = new DefaultChooser + val consumer = Map(system -> new SimpleConsumer) + val systemMessageSerdes = Map(system -> (new StringSerde("UTF-8")).asInstanceOf[Serde[Object]]); + val serdeManager = new SerdeManager(systemMessageSerdes = systemMessageSerdes) + + // it should throw exceptions when the deserialization has error + val consumers = new SystemConsumers(msgChooser, consumer, serdeManager, dropDeserializationError = false) + consumers.register(systemStreamPartition, "0") + consumers.start + consumer(system).putBytesMessage + consumer(system).putStringMessage + + var caughtRightException = false + try { + consumers.choose + } catch { + case e: SystemConsumersException => caughtRightException = true + case _: Throwable => caughtRightException = false + } + assertTrue("suppose to throw SystemConsumersException", caughtRightException); + consumers.stop + + // it should not throw exceptions when deserializaion fails if dropDeserializationError is set to true + val consumers2 = new SystemConsumers(msgChooser, consumer, serdeManager, dropDeserializationError = true) + consumers2.register(systemStreamPartition, "0") + consumers2.start + consumer(system).putBytesMessage + consumer(system).putStringMessage + + var notThrowException = true; + try { + consumers2.choose + } catch { + case e: Throwable => notThrowException = false + } + + assertTrue("it should not throw any exception", notThrowException) + consumers2.stop + } + + /** + * a simple consumer that provides two extra methods, one is to put bytes format message + * and the other to put string format message + */ + private class SimpleConsumer extends BlockingEnvelopeMap { + val systemStreamPartition = new SystemStreamPartition("test-system", "some-stream", new Partition(1)) + def putBytesMessage { + put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, "0", "0", "test".getBytes())) + } + def putStringMessage { + put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, "0", "1", "test")) + } + def start {} + def stop {} + def register { super.register(systemStreamPartition, "0") } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e0ce7ec0/samza-core/src/test/scala/org/apache/samza/system/TestSystemProducers.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestSystemProducers.scala b/samza-core/src/test/scala/org/apache/samza/system/TestSystemProducers.scala new file mode 100644 index 0000000..45da7b6 --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/system/TestSystemProducers.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system + +import org.junit.Assert._ +import org.junit.Test +import org.apache.samza.serializers._ +import org.apache.samza.SamzaException + +class TestSystemProducers { + + @Test + def testDroppingMsgOrThrowExceptionWhenSerdeFails() { + val system = "test-system" + val systemStream = new SystemStream(system, "stream1") + val systemMessageSerdes = Map(system -> (new StringSerde("UTF-8")).asInstanceOf[Serde[Object]]); + val serdeManager = new SerdeManager(systemMessageSerdes = systemMessageSerdes) + val systemProducer = new SystemProducer { + def start {} + def stop {} + def register(source: String) {} + def send(source: String, envelope: OutgoingMessageEnvelope) {} + def flush(source: String) {} + } + val systemProducers = new SystemProducers(Map(system -> systemProducer), serdeManager, new SystemProducersMetrics, false) + systemProducers.register(system) + val outgoingEnvelopeCorrectMsg = new OutgoingMessageEnvelope (systemStream, "test") + val outgoingEnvelopeErrorMsg = new OutgoingMessageEnvelope (systemStream, 123) + systemProducers.send(system, outgoingEnvelopeCorrectMsg) + + var getCorrectException = false + try { + systemProducers.send(system, outgoingEnvelopeErrorMsg) + } catch { + case e: SamzaException => getCorrectException = true + case _: Throwable => getCorrectException = false + } + assertTrue(getCorrectException) + + val systemProducers2 = new SystemProducers(Map(system -> systemProducer), serdeManager, new SystemProducersMetrics, true) + systemProducers2.register(system) + systemProducers2.send(system, outgoingEnvelopeCorrectMsg) + + var notThrowException = true + try { + systemProducers2.send(system, outgoingEnvelopeErrorMsg) + } catch { + case _: Throwable => notThrowException = false + } + assertTrue(notThrowException) + } +} \ No newline at end of file
