> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java, line 57
> > <https://reviews.apache.org/r/27649/diff/1/?file=760109#file760109line57>
> >
> >     a little concerned about adding this method in the interface.
> >     1. it breaks backwards compatibility for all existing systems, though 
> > we only have KafkaSystem and FileSystem.
> >     2. it "implies" all systemAdmins should implement this. Actually only a 
> > few systemAdmins maybe used to generate coordinatorStream.

Yea, I think you're right. I was considering adding a more specific SystemAdmin 
interface, like CoordinatorSystemAdmin, which would mix in this method. Does 
that sounds good?


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java,
> >  lines 56-57
> > <https://reviews.apache.org/r/27649/diff/1/?file=760111#file760111line56>
> >
> >     a little doc here ? I am still a little confusing about what each map 
> > contains.

Done.


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java,
> >  line 71
> > <https://reviews.apache.org/r/27649/diff/1/?file=760111#file760111line71>
> >
> >     what is this "user.name" for?

It tracks the username of the account that sent the coordinator stream message. 
This is mostly useful when we have control-job.sh, and developers are manually 
modifying their cooridnator stream. It should help with debugging when some 
confusing message is in the coordinator stream.


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java,
> >  line 77
> > <https://reviews.apache.org/r/27649/diff/1/?file=760111#file760111line77>
> >
> >     maybe some messages?

Decided to switch this to a warn(), and just setHost to "".


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java,
> >  line 114
> > <https://reviews.apache.org/r/27649/diff/1/?file=760111#file760111line114>
> >
> >     the name of this method is too ambiguous.

Not quite sure what to call it. I tried getMessageValue.


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java,
> >  line 119
> > <https://reviews.apache.org/r/27649/diff/1/?file=760111#file760111line119>
> >
> >     the name of this method is a little ambiguous. I was thinking it may 
> > put value into KeyMap.

Same as above.


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java,
> >  line 156
> > <https://reviews.apache.org/r/27649/diff/1/?file=760111#file760111line156>
> >
> >     duplicate code in getValue()
> >     "Map<String, String>) messageMap.get("values")"

Good catch. Added getMessageValues().


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java,
> >  line 199
> > <https://reviews.apache.org/r/27649/diff/1/?file=760111#file760111line199>
> >
> >     what does this "value" mean?

I switched to putMessageValue, and added a lot more docs in the 
CoordinatorStreamMessage's Javadoc section to explain this.


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java,
> >  line 46
> > <https://reviews.apache.org/r/27649/diff/1/?file=760112#file760112line46>
> >
> >     remove "reads"?

Done.


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java,
> >  line 91
> > <https://reviews.apache.org/r/27649/diff/1/?file=760112#file760112line91>
> >
> >     in the comment, it says "the earliest offset" while the method called 
> > is "getOldestOffset(). Can they use the same word ?

Done. Standardized on "oldest", since that's what we use for config as well. 
Agree this is confusing.


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java,
> >  line 34
> > <https://reviews.apache.org/r/27649/diff/1/?file=760113#file760113line34>
> >
> >     remove "reads" here?

Done.


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala, lines 
> > 41-42
> > <https://reviews.apache.org/r/27649/diff/1/?file=760115#file760115line41>
> >
> >     shall we follow the same naming fashion as JOB_COORDINATOR_SYSTEM and 
> > JOB_CONTAINER_COUNT ?

Good call. Done.


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala,
> >  line 59
> > <https://reviews.apache.org/r/27649/diff/1/?file=760117#file760117line59>
> >
> >     some javadocs here?

Done.


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala,
> >  line 61
> > <https://reviews.apache.org/r/27649/diff/1/?file=760117#file760117line61>
> >
> >     do we want to have a metric such as JobCoordinatorMetric ?

Yes. I was planning to leave that for the JobCoordinator refactor, though.


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala,
> >  lines 59-63
> > <https://reviews.apache.org/r/27649/diff/1/?file=760118#file760118line59>
> >
> >     a lot of duplicate code here. can we get rid of some of them?

Done.


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala,
> >  line 57
> > <https://reviews.apache.org/r/27649/diff/1/?file=760120#file760120line57>
> >
> >     "coorindatorSystemConfig" -> "coordinatorSystemConfig" ? typo . lol

Ooof. My hands have the hardest time typing "coordinator". Fixed.


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 155
> > <https://reviews.apache.org/r/27649/diff/1/?file=760123#file760123line155>
> >
> >     space

Done.


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala,
> >  line 112
> > <https://reviews.apache.org/r/27649/diff/1/?file=760130#file760130line112>
> >
> >     this will throw exceptions when we have fewer than 3 brokers.

Yea, I ran into this while testing. I'm hesitant to default it to 1, though, 
since this is unsafe (don't want to lose coordinator stream). On the other 
hand, we default it to 3 in the KafkaSystemFactory anyway, so the only time 
this constructor's default value is used would be in a test. Switched to 1.


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java, 
> > line 67
> > <https://reviews.apache.org/r/27649/diff/1/?file=760134#file760134line67>
> >
> >     space

Fixed.


- Chris


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27649/#review61222
-----------------------------------------------------------


On Nov. 12, 2014, 6:50 p.m., Chris Riccomini wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/27649/
> -----------------------------------------------------------
> 
> (Updated Nov. 12, 2014, 6:50 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-448
>     https://issues.apache.org/jira/browse/SAMZA-448
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> make deletes actually work
> 
> 
> add javadocs to mock coordinator stream classes.
> 
> 
> delete old configs when job runner publishes new configs
> 
> 
> add rewriting into job coordinator
> 
> 
> make process job properly set only coordinator stream config
> 
> 
> all tests pass
> 
> 
> fix samza container performance test
> 
> 
> explicitly flush all buffers when closing the kafka producer. fix stateful 
> task test.
> 
> 
> fix kafka tests
> 
> 
> all core tests work
> 
> 
> fix test checkpoint tool by adding a mock coordinator consumer that dumps the 
> entire config
> 
> 
> working on fixing checkpoint tool tests
> 
> 
> fleshing out the coordinator stream message javadocs
> 
> 
> remove duplicate code from coordinator system factory
> 
> 
> add more javadocs. clean up todos inkafka system admin.
> 
> 
> remove yarn.container.count from yarn config, but use it as a fallback to 
> job.container.count
> 
> 
> add some docs and headers to the coodinator stream and system admin
> 
> 
> refactoring to add coordinator stream system consumer
> 
> 
> cleanup source in job runne
> 
> 
> abstract coordinator system producer creation into a factory
> 
> 
> add todos
> 
> 
> config stream works
> 
> 
> create coordinator stream in system admin
> 
> 
> connecting job coordinator to job runner via coordinator stream
> 
> 
> add util and logging methods
> 
> 
> adding coordinator message and system producer wrapper
> 
> 
> Diffs
> -----
> 
>   build.gradle 828bce9913db00161971607e4c9ac19c63cecb95 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 
> 571c60631357ea9a0b4fa24e7253008619ef2f32 
>   
> samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
>  38e313f3c39454110efd354e6ca025869fa930cd 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala 
> ddc30af7c52d8a4d5c5de02f6757c040b1f31c93 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
> 3b6685e00837a4aaf809813e62b7e52823bc07a9 
>   samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala 
> 1a2dd4413f56e53dbeeb47b5637d7b0c50522f02 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> c14f2f623bb4bae911dd3085ce428175930e4545 
>   
> samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 
> 16345cd1c1354a0d25a0000d81a307dbe3abbe81 
>   
> samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala 
> 6985af6e7cc0d408fa07fbac60141d1126323777 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
> 530255e5866bc49ec5ce1a0b7437470cd4e17010 
>   
> samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
>  98e92bc12f3e2827cdec02f1ce94d7e2314e4b4e 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 
> 1a67586eeec95dabfeb3b6881af9b3865c3029ca 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
>  PRE-CREATION 
>   samza-core/src/test/resources/test.properties 
> 9348c7de956ebf02f58a163dc6fb391a7e29ae64 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
>  af800dfeedbfea75abaac3f15fd53bc55b743daf 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
> a79eccaa8fc18d197b77f9363f1814fefc4ac40d 
>   
> samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
>  1eb0edab1bc792ccf8c503b03687284451ab0f34 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
>  5ac33ea36da451250655d9dd373692b964322b41 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
>  4ed5e881031e019d8df6de259cabb658820a3ba0 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
>  a0e1ccbfe9dc4fd26ca6b30fc2d1348fb7d007e4 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
>  5ceb1093a66cb57e298d4b3ccdd24845dbb41b58 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
> fa1d51b290013a3913d64884dc43907a76670849 
>   
> samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
>  118f5eee22016db3b802c32fb26c5d72fa61f1a7 
>   
> samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala
>  d589d762a18f9425aa8d8dd589011a151bcb59a4 
>   samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 
> 03395e2efa0fec723e354177d06bfacf7d2a9215 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala 
> 91aff3c5e0a2bcea45120d794371fca1c638ccfe 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala 
> 8ba435ef2ccf2af64d01eb4bc3b1c362fb03779d 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
>  b0b6543856cb87888c5a719182ad9576b51bba1a 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 
> 24b11da06a69da734c85720ef39d65ee46d821d5 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
>  765f72f4c10bd0f1d1adab28c8ec54d9cbea5fb4 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
>  81dea9d6d1921462b200c62dbdf016c0eb2f01b2 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
>  cab5101c5c9e2a979bca545fa8046e93dcfe46e2 
> 
> Diff: https://reviews.apache.org/r/27649/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Chris Riccomini
> 
>

Reply via email to