> On Dec. 8, 2014, 12:18 a.m., Martin Kleppmann wrote: > > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala, > > line 72 > > <https://reviews.apache.org/r/27649/diff/4/?file=762969#file762969line72> > > > > Logging is perhaps a bit verbose, some of it could be at debug level?
Done. > On Dec. 8, 2014, 12:18 a.m., Martin Kleppmann wrote: > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java, > > line 47 > > <https://reviews.apache.org/r/27649/diff/4/?file=762963#file762963line47> > > > > I assume ordering of properties and whitespace in this JSON key object > > are significant, since Kafka uses bytewise equality comparison of keys, is > > that right? In that case, could you document whatever convention you're > > using for a canonical encoding here? > > > > Just as a suggestion, an option would be to use a JSON array instead of > > a JSON object, so that at least the ordering is explicit. Also, a JSON > > object looks like it should be extensible (people expect to be able to add > > their own fields without affecting correctness), which is not the case. A > > JSON array would look sufficiently "unusual" that it would probably > > disabuse people of that notion. Array key seems more exact, so I'll switch to it. It's less human friendly, but in this case, that's probably a good thing. > On Dec. 8, 2014, 12:18 a.m., Martin Kleppmann wrote: > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java, > > line 96 > > <https://reviews.apache.org/r/27649/diff/4/?file=762965#file762965line96> > > > > Might be nicer for the mapping from CoordinatorStreamMessage to/from > > bytes is part of the CoordinatorStreamMessage class, rather than here? I prefer keeping serde separate from the object itself. I'll make this pluggable, but continue using JSON. > On Dec. 8, 2014, 12:18 a.m., Martin Kleppmann wrote: > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java, > > line 108 > > <https://reviews.apache.org/r/27649/diff/4/?file=762963#file762963line108> > > > > I assume you're using TreeMap to get ordering of fields? Might be worth > > a little comment to make that explicit. Negated by the comment above. - Chris ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27649/#review64189 ----------------------------------------------------------- On Nov. 13, 2014, 11:57 p.m., Chris Riccomini wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/27649/ > ----------------------------------------------------------- > > (Updated Nov. 13, 2014, 11:57 p.m.) > > > Review request for samza. > > > Bugs: SAMZA-448 > https://issues.apache.org/jira/browse/SAMZA-448 > > > Repository: samza > > > Description > ------- > > fix ordering of keys in coordinator stream message > > > add test for coordinator stream system producer > > > add coordinator system consumer test > > > add coordinator stream message tests > > > add kafka system admin create cooridnator stream test > > > remove SystemAdmin.createCoordinatorStream, and create a > CoordinatorStreamAdmin instead. > > > more feedback from yan > > > partial fix of yan's review comments > > > add logging info > > > make deletes actually work > > > add javadocs to mock coordinator stream classes. > > > delete old configs when job runner publishes new configs > > > add rewriting into job coordinator > > > make process job properly set only coordinator stream config > > > all tests pass > > > fix samza container performance test > > > explicitly flush all buffers when closing the kafka producer. fix stateful > task test. > > > fix kafka tests > > > all core tests work > > > fix test checkpoint tool by adding a mock coordinator consumer that dumps the > entire config > > > working on fixing checkpoint tool tests > > > fleshing out the coordinator stream message javadocs > > > remove duplicate code from coordinator system factory > > > add more javadocs. clean up todos inkafka system admin. > > > remove yarn.container.count from yarn config, but use it as a fallback to > job.container.count > > > add some docs and headers to the coodinator stream and system admin > > > refactoring to add coordinator stream system consumer > > > cleanup source in job runne > > > abstract coordinator system producer creation into a factory > > > add todos > > > config stream works > > > create coordinator stream in system admin > > > connecting job coordinator to job runner via coordinator stream > > > add util and logging methods > > > adding coordinator message and system producer wrapper > > > Diffs > ----- > > build.gradle 828bce9913db00161971607e4c9ac19c63cecb95 > samza-api/src/main/java/org/apache/samza/system/CoordinatorSystemAdmin.java > PRE-CREATION > samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java > 571c60631357ea9a0b4fa24e7253008619ef2f32 > > samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java > 38e313f3c39454110efd354e6ca025869fa930cd > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala > ddc30af7c52d8a4d5c5de02f6757c040b1f31c93 > samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala > 3b6685e00837a4aaf809813e62b7e52823bc07a9 > samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala > 1a2dd4413f56e53dbeeb47b5637d7b0c50522f02 > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala > c14f2f623bb4bae911dd3085ce428175930e4545 > > samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala > 16345cd1c1354a0d25a0000d81a307dbe3abbe81 > > samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala > 6985af6e7cc0d408fa07fbac60141d1126323777 > samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala > 530255e5866bc49ec5ce1a0b7437470cd4e17010 > samza-core/src/main/scala/org/apache/samza/util/Util.scala > 1a67586eeec95dabfeb3b6881af9b3865c3029ca > > samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java > PRE-CREATION > samza-core/src/test/resources/test.properties > 9348c7de956ebf02f58a163dc6fb391a7e29ae64 > > samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala > af800dfeedbfea75abaac3f15fd53bc55b743daf > > samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala > 1eb0edab1bc792ccf8c503b03687284451ab0f34 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala > 5ac33ea36da451250655d9dd373692b964322b41 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala > 4ed5e881031e019d8df6de259cabb658820a3ba0 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala > a0e1ccbfe9dc4fd26ca6b30fc2d1348fb7d007e4 > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala > 5ceb1093a66cb57e298d4b3ccdd24845dbb41b58 > samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java > fa1d51b290013a3913d64884dc43907a76670849 > > samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala > 118f5eee22016db3b802c32fb26c5d72fa61f1a7 > > samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala > d589d762a18f9425aa8d8dd589011a151bcb59a4 > samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala > 03395e2efa0fec723e354177d06bfacf7d2a9215 > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala > 91aff3c5e0a2bcea45120d794371fca1c638ccfe > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala > 8ba435ef2ccf2af64d01eb4bc3b1c362fb03779d > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala > b0b6543856cb87888c5a719182ad9576b51bba1a > samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala > 24b11da06a69da734c85720ef39d65ee46d821d5 > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala > 765f72f4c10bd0f1d1adab28c8ec54d9cbea5fb4 > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala > 81dea9d6d1921462b200c62dbdf016c0eb2f01b2 > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala > cab5101c5c9e2a979bca545fa8046e93dcfe46e2 > > Diff: https://reviews.apache.org/r/27649/diff/ > > > Testing > ------- > > > Thanks, > > Chris Riccomini > >
