-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/#review142183
-----------------------------------------------------------



Thanks for pulling it off! I did the first round of review. If there is 
anything unclear, please let me know.


samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java 
(line 24)
<https://reviews.apache.org/r/48356/#comment207749>

    So, what's the plan to make customized CheckpointConfig for each different 
checkpointManagerFactory? Derive a sub-class from this one?
    
    Generally, I noticed that there are a set of configuration variables are in 
this category:
    - configure a factory
    - a set of configure variables are only meaningful if factory = x
    
    It would be good to write up some documentation here to show case how we 
are dealing w/ the type of configurations above.



samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java 
(line 31)
<https://reviews.apache.org/r/48356/#comment207823>

    Just wonder, can we have an interface class ConfigBuilder as a common 
interface for all modular config builders, s.t. JobConfig, TaskConfig, etc. 
    And make SamzaConfigBuilder as an implementation of ConfigBuilder to hold 
all sub-builders?



samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java 
(line 32)
<https://reviews.apache.org/r/48356/#comment207750>

    Add some javadoc here.



samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java 
(line 49)
<https://reviews.apache.org/r/48356/#comment207751>

    It is great that you are creating modular configuration objects here now! I 
would take the chance to also create JobConfig, TaskConfig, SerdeConfig, 
StoreConfig as well.



samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java 
(line 58)
<https://reviews.apache.org/r/48356/#comment207799>

    Here it seems like that you are mandating any config should have jobName 
and taskClass. However, I believe those two are not the only mandatory 
configuration variables. I would suggest that we remove these from the 
constructor of ConfigBuilder. Instead, make it more modular s.t. we can call: 
ConfigBuilder.getGenericConfigBuilder().jobConfig(jobFactory, jobName, jobId, 
coordinatorSystem).taskConfig(taskClass, taskInputs, ...)
    
    Then, finally, we can call build() to compile the complete configuration, 
in which we can call validate() for each modular config class (e.g. 
JobConfig.validate()) and also validate the inter-dependencies between the 
modular config classes (e.g. systems defined in JobConfig, StoreConfig, etc. 
must have a corresponding SystemConfig).
    
    P.S. just saw Chris' comment earlier. I think that if we can separate the 
ConfigBuilder into smaller modular sections, we can make the mandatory config 
variable in constructors.



samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java 
(line 71)
<https://reviews.apache.org/r/48356/#comment207800>

    



samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java 
(line 107)
<https://reviews.apache.org/r/48356/#comment207801>

    How would it work w/ RegexConfigRewriter? We have use case where user will 
leave this empty and configure RegexConfigRewriter to fill it up at runtime. 
Are we making the case that this builder has to be called after 
RegexConfigRewriter is invoked?



samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java 
(line 109)
<https://reviews.apache.org/r/48356/#comment207802>

    It seems that here you are trying to create a holistic configuration to 
describe each stream in task.inputs. I am curious what you have in mind as the 
organization of all the configures? Is it something like:
    TaskConfig -> task.inputs -> stream -> SystemStreamConfig? It would be good 
to have some documentation to show the modules and relationship between the 
modules in the config.



samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java 
(line 110)
<https://reviews.apache.org/r/48356/#comment207804>

    We have another config pattern here:
    - configuration variables on a entity (i.e. SystemStream in this case) are 
dispersed in multiple config modules (i.e. TaskConfig has task.inputs, while 
SystemConfig has the serde names and the replicate factors, and SerdeConfig has 
the acutal serde class name, etc.).There are more like this as changelog in 
StoreConfig also has a SystemStreamConfig which would be generated via 
StoreConfig.{key,msg}.serde and the corresponding SystemConfig. It would be 
really nice to call out those use patterns of configuration variables and 
describe how the new modular design handles:
    - encapsulation of the variables
    - relationship between the config modules



samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java 
(line 146)
<https://reviews.apache.org/r/48356/#comment207808>

    Actually, CheckpointConfig only has the factory name in it. I assume that 
only KafkaCheckpointConfig would have the task.checkpoint.system set?



samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java 
(line 169)
<https://reviews.apache.org/r/48356/#comment207809>

    I would prefer to encapsulate the validation in JobConfig.validate(), 
TaskConfig.validate(), etc.



samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java 
(line 240)
<https://reviews.apache.org/r/48356/#comment207813>

    Can we make this as package private? As a private class within 
ConfigBuilder, this method shouldn't be called outside ConfigBuilder.build().



samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java
 (line 55)
<https://reviews.apache.org/r/48356/#comment207816>

    



samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java (line 
19)
<https://reviews.apache.org/r/48356/#comment207846>

    I think that it would be OK to keep all config builders in the same 
org.apache.samza.config package.



samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java (line 
30)
<https://reviews.apache.org/r/48356/#comment207847>

    Recently, Jon added a double serde as well.



samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java (line 
65)
<https://reviews.apache.org/r/48356/#comment207848>

    



samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java
 (line 23)
<https://reviews.apache.org/r/48356/#comment207849>

    Is it true all standalone Samza processor uses this widecard grouper? I 
would prefer to have a specialized standalone config builder for each different 
type of standalone usage. Same for the TaskNameGrouperFactory configuration 
variable.
    
    P.S. also saw Chris' comment on not creating a base class w/o knowing 
what's the common base. Totally agree. Hence, it would make sense to name this 
"standalone" correctly s.t. it is clear that it only implements a specific type 
of Samza job. The name "standalone" creates some confusion since we also called 
ZK-based implementation as "standalone" as well.



samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java (line 
38)
<https://reviews.apache.org/r/48356/#comment207850>

    nit: It would be good to document here what it means by setting the key and 
msg serdes to null. From the code, it seems that it means pass-through (no 
serde).



samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java
 (line 43)
<https://reviews.apache.org/r/48356/#comment207851>

    Do we handle broadcast stream in this grouper? If not, we should make it 
clear in the javadoc that this grouper won't work w/ broadcast stream. Or, 
better, fail the config validation if we found out this is the case.



samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java (line 
86)
<https://reviews.apache.org/r/48356/#comment207862>

    In addition, I don't see a use case where the life-cycle of JobCoordinator 
is outside of the life-cycle of StreamProcessor. It seems better to instantiate 
the JobCoordinator object within StreamProcessor, instead of passing in an 
already created object in the public constructor.



samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java (line 
118)
<https://reviews.apache.org/r/48356/#comment207861>

    Seems like this should be a generic constructor, not just for standalone. 
Also, isn't it true that we should be able to figure out what 
JobCoordinatorFactory we should use to instantiate the JobCoordinator instance 
from the Config object? Something like:
    
    JobCoordinatorFactory.getFactory(config).getJobCoordinator(config) should 
be more generic.



samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java (line 
130)
<https://reviews.apache.org/r/48356/#comment207865>

    Not sure whether we want to keep JmxServer life-cycle within the 
StreamProcessor life-cycle. This actually could be one thing that is shared w/ 
the whole JVM process and it can be passed in to the StreamProcessor.



samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java (line 
138)
<https://reviews.apache.org/r/48356/#comment207866>

    nit: the order of operations in the comments is not consistent w/ the order 
of operations in the code.



samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java (line 
176)
<https://reviews.apache.org/r/48356/#comment207868>

    



samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
 (line 44)
<https://reviews.apache.org/r/48356/#comment207870>

    I think that we should make the javadoc more clear. If this 
StandaloneJobCoordinator is an implementation that defines:
    - config based JobModel generation (i.e. configure via widecard groupers)
    - no leader election
    
    We should put these definitions in the javadoc here as well.



samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
 (line 70)
<https://reviews.apache.org/r/48356/#comment207873>

    I don't fully understand the "unfortunate" part of this comment? In a fully 
fledged JobCoordinator, it needs to perform:
    - Leader election
    - Partition assignment (which includes SystemStreamPartitionGrouper)
    - Physical resource allocation (which includes TaskNameGrouper that groups 
tasks into a physical process (i.e. ContainerModel), and locality manager to 
report the physical location of the tasks)
    
    Hence, LocalityManager and TaskNameGrouper is coupled via JobCoordinator, 
anyways.
    
    Also, not sure what you meant by "groupers should be a property... rather 
the component that handles task distribution". There are two different types of 
groupers that do two different type of functions (partition assignment and 
resource allocation). And what do you mean by "not that of the job anymore"?



samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
 (line 28)
<https://reviews.apache.org/r/48356/#comment207875>

    nit: it would be nice to perform a validation of config to see whether it 
truly is a standalone config.



samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala (line 80)
<https://reviews.apache.org/r/48356/#comment207877>

    Can we just name it job.coordinator.host-affinity.enabled? Not particularly 
a fan of creating a new config that is to be deprecated. ;)
    For non-YARN JobCoordinator, we can just fail the config validation stating 
that it is not supported yet, if that is the case.



samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 
<https://reviews.apache.org/r/48356/#comment207880>

    I think that we should deprecate this one w/ 
job.coordinator.host-affinity.enabled. Maybe copying over this value and print 
a warning for now and remove completely later.


- Yi Pan (Data Infrastructure)


On July 13, 2016, 9:58 p.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48356/
> -----------------------------------------------------------
> 
> (Updated July 13, 2016, 9:58 p.m.)
> 
> 
> Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Added ConfigBuilder and support classes
> 
> Added JobCoordinator interfaces
> 
> 
> Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer 
> interface
> 
> 
> Added TestStreamProcessor and some unit tests for ConfigBuilders
> 
> 
> Changing who defined processorId
> 
> 
> Fixed checkstyle errors
> 
> 
> Replaced SamzaException with ConfigException
> 
> 
> Removing localityManager instantiation from Samza Container
> 
> 
> Diffs
> -----
> 
>   build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af 
>   checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 
> 021d42a70179f5d14f51ac87cb09dcc97218095e 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
> 49b08f6b68dbb44757dcc8ce8d60c365a9d22981 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
> 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
> cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 18c09224bbae959342daf9b2b7a7d971cc224f48 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> d3bd9b7c11afd44ccfb681b660fefffafd216c29 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
> 56881d46be9f859999adabbbda20433b208e012e 
>   
> samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java
>  PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java 
> PRE-CREATION 
>   
> samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java 
> PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 
> 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java 
> bc95f31c0dcaaa68d483a6f152b61aba6c543fff 
> 
> Diff: https://reviews.apache.org/r/48356/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> Local integration test:
> ./bin/grid start zookeeper
> ./bin/grid start kafka
> Then, run TestStreamProcessor.java
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>

Reply via email to