> On Feb. 10, 2014, 10:11 p.m., Chris Riccomini wrote: > > 1. Clean up misc white space tabs (highlighted in red) in RB. > > 2. Add a test. > > 3. Update docs to describe new configuration > > (docs/learn/documentation/0.7.0/jobs/configuration-table.html)
1/3 Done, I still need to look at the tests. > On Feb. 10, 2014, 10:11 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala, > > line 245 > > <https://reviews.apache.org/r/17869/diff/1/?file=480625#file480625line245> > > > > Remove extra tab. done > On Feb. 10, 2014, 10:11 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/config/SystemConsumersConfig.scala, > > line 3 > > <https://reviews.apache.org/r/17869/diff/1/?file=480624#file480624line3> > > > > Thought about this more. Rather than create a SystemConsumersConfig, > > let's just move this setting into the existing TaskConfig class. > > > > For the name, let's keep it consistent with other config names: > > > > task.drop.deserilization.errors done > On Feb. 10, 2014, 10:11 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala, > > line 150 > > <https://reviews.apache.org/r/17869/diff/1/?file=480626#file480626line150> > > > > No need to define this here. Just use serdeErrorDrop directly below. done > On Feb. 10, 2014, 10:11 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala, > > line 274 > > <https://reviews.apache.org/r/17869/diff/1/?file=480626#file480626line274> > > > > We can make this a little more compact and idiomatic-Scala with: > > > > val messageEnvelope = try { > > Some(serdeManager.fromBytes(envelope)) > > } catch { > > ... > > None > > } > > > > if(messageEnvelope.isDefined) { > > messageEnvelope.get > > } done > On Feb. 10, 2014, 10:11 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala, > > line 156 > > <https://reviews.apache.org/r/17869/diff/1/?file=480626#file480626line156> > > > > Move this to SystemConsumersMetrics. Rename to deserialization error, > > since we're deserializing, not serializing here. Use standard metrics style > > ("deserialization-errors"). done > On Feb. 10, 2014, 10:11 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala, > > line 246 > > <https://reviews.apache.org/r/17869/diff/1/?file=480625#file480625line246> > > > > Could you move this to right above where it's used (above the new > > SystemConsumers call in SamazContainer)? > > > > Using config.getDropIncomingMessagesOnSerdeFailure should work. I > > wonder if it didn't work because SystemConsumers expects a boolean, and > > this is returning an Option[Boolean]. You might try: > > > > config.getDropIncomingMessagesOnSerdeFailure.getOrElse(false) I am getting a compiler error with this change. I am not familiar with this syntax, or scala, i need to do some research on what I am doing wrong here. [ant:scalac] /Users/dantonetti/work/samza/incubator-samza/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala:279: error: type mismatch; [ant:scalac] found : Any [ant:scalac] required: Boolean [ant:scalac] Error occurred in an application involving default arguments. [ant:scalac] serdeErrorDrop = dropOnSerdeError) [ant:scalac] ^ [ant:scalac] one error found - Daniel ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/17869/#review34108 ----------------------------------------------------------- On Feb. 10, 2014, 8:37 p.m., Daniel Antonetti wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/17869/ > ----------------------------------------------------------- > > (Updated Feb. 10, 2014, 8:37 p.m.) > > > Review request for samza. > > > Bugs: SAMZA-59 > https://issues.apache.org/jira/browse/SAMZA-59 > > > Repository: samza > > > Description > ------- > > I have added a boolean flag (serdeErrorDrop) to SystemConsumers.scala, Which > will allow a configuration to drop bad incoming messages. > > > I moved the message deserialization up before updateFetchMap(). > > > I tried to setup the configuration, but it was not working. I am getting a > compiler error, and I could not find where the other configurations were > setup. > > > NOTE : I tried to make my change consistent with the rest of the code, but I > am not too familiar with scala. > > > Diffs > ----- > > > samza-core/src/main/scala/org/apache/samza/config/SystemConsumersConfig.scala > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > 13421d2 > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala > cdba7fe > > Diff: https://reviews.apache.org/r/17869/diff/ > > > Testing > ------- > > I tested both setting dropOnSerdeError to true and to false, and they both > seemed to work. > But the configuration was not working, and I could not figure out how to get > it working. > > > Thanks, > > Daniel Antonetti > >
