cameronlee314 commented on a change in pull request #1171: StreamConfig from 
scala to Java
URL: https://github.com/apache/samza/pull/1171#discussion_r329171490
 
 

 ##########
 File path: 
samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
 ##########
 @@ -80,13 +79,15 @@ object OffsetManager extends Logging {
     offsetManagerMetrics: OffsetManagerMetrics = new OffsetManagerMetrics) = {
     debug("Building offset manager for %s." format systemStreamMetadata)
 
+    val streamConfig = new StreamConfig(config)
+
     val offsetSettings = systemStreamMetadata
       .map {
         case (systemStream, systemStreamMetadata) =>
           // Get default offset.
-          val streamDefaultOffset = config.getDefaultStreamOffset(systemStream)
-          val systemDefaultOffset = new 
SystemConfig(config).getSystemOffsetDefault(systemStream.getSystem)
-          val defaultOffsetType = if (streamDefaultOffset.isDefined) {
+          val streamDefaultOffset = 
streamConfig.getDefaultStreamOffset(systemStream)
+          val systemDefaultOffset = new 
SystemConfig(streamConfig).getSystemOffsetDefault(systemStream.getSystem)
 
 Review comment:
   Minor: I don't think it matters right now, but for consistency, could you 
pass `config` when creating `SystemConfig` instead of passing the 
`streamConfig`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to