----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/51142/#review148780 -----------------------------------------------------------
Thanks for pulling it off! Two high-level comments: a) I would prefer to stick w/ the same pattern for metric initialization in Samza (i.e. use MetricsRegistry to create a metric object extending MetricsHelper and pass it into the constructor for SystemConsumer); b) I don't fully understand the use case that we need to use partial-ordered offset vector in the same partition as implemented in MultiFileHdfsReader. Let's discuss in person. samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java (line 50) <https://reviews.apache.org/r/51142/#comment216284> It would be better to wrap this into the logic inside PartitionDescriptor class. Hence, no need to expose and leak out the delimiter used inside the PartitionDescriptor. Similar comments on offsets. samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java (line 54) <https://reviews.apache.org/r/51142/#comment216350> It would be nicer to guarantee some sementic ordering among the files in the same partition, when persisting the partition descriptor. samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java (line 59) <https://reviews.apache.org/r/51142/#comment216351> Not sure what are we doing here? What's the ordering that we are enforcing in this multi-file partition? I saw that you are trying to make the offsets as an offset vector on top of all files in the same partition. Why? Can we simplify it by making it full-ordered in the same partition instead of partial-ordered via an offset vector? samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java (line 75) <https://reviews.apache.org/r/51142/#comment216352> Not sure why do we want to use the offset vector??? samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java (line 90) <https://reviews.apache.org/r/51142/#comment216353> Again, why??? It is hard to understand without a concrete use case that requires a partial ordering of offsets in the same partition? samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala (line 66) <https://reviews.apache.org/r/51142/#comment216354> It would be nicer to make it conforming to Offspring style of config variable scoping. i.e. if the scope of configuration is for hdfs consumer, use systems.%s.consumer.hdfs.buffer-capacity. I would suggest to consult Prateek since he has been working on the Offspring config refactoring. For new config variables, "." should strictly be used as deliminator between scopes, not as deliminator between words. samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala (line 78) <https://reviews.apache.org/r/51142/#comment216355> Same here. systems.%s.partitioner.group-pattern. Also, does this partitioner apply to other type of systems as well? Or just for HDFS? Won't it be better to name it systems.%s.hdfs-partitioner.* ? samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala (line 86) <https://reviews.apache.org/r/51142/#comment216356> This is the redundant definition of a config variable from YarnConfig object. What's the intended relationship between HdfsConfig and YarnConfig? Besides, yarn.job.staging.directory is the metadata config path for the whole job. I assume that HdfsConfig is the metadata config path for a specific SystemAdmin? I would recommend that we have a per system config here. It may be default to YarnConfig.getJobStagingDirectory(). samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala (line 32) <https://reviews.apache.org/r/51142/#comment216357> Why can't we create the HdfsSystemConsumerMetrics here and just passing in the HdfsSystemConsumerMetrics as we do w/ HdfsSystemProducer? Which use case makes it necessary to pass in the MetricsRegistry directly? samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala (line 36) <https://reviews.apache.org/r/51142/#comment216358> Not related to your RB, but could you open a JIRA for this one? Using KafkaUtil class in HdfsSystemFactory seems really weird. samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestDirectoryPartitioner.java (line 183) <https://reviews.apache.org/r/51142/#comment216360> It would be better to ensure that the index used in each partition is referring to the sorted list from inputFiles, to guarantee the consistent ordering/indexing of the same file in the list of input files. Is it possible that a new file is inserted in the middle of the sorted list? Maybe sort by create time to ensure the files are always appended to the greatest index in the sorted list? samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala (line 127) <https://reviews.apache.org/r/51142/#comment216361> nit: unnecessary re-formatting samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala (line 145) <https://reviews.apache.org/r/51142/#comment216362> nit: unnecessary re-formatting samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala (line 165) <https://reviews.apache.org/r/51142/#comment216363> nit: unnecessary re-formatting samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala (line 183) <https://reviews.apache.org/r/51142/#comment216364> nit: unnecessary re-formatting samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala (line 204) <https://reviews.apache.org/r/51142/#comment216365> nit: unnecessary re-formatting samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala (line 222) <https://reviews.apache.org/r/51142/#comment216366> nit: unnecessary re-formatting samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala (line 242) <https://reviews.apache.org/r/51142/#comment216367> nit: unnecessary re-formatting samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala (line 260) <https://reviews.apache.org/r/51142/#comment216368> nit: unnecessary re-formatting samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala (line 320) <https://reviews.apache.org/r/51142/#comment216369> nit: unnecessary re-formatting samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala (line 338) <https://reviews.apache.org/r/51142/#comment216370> nit: unnecessary re-formatting samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala (line 349) <https://reviews.apache.org/r/51142/#comment216371> nit: unnecessary re-formatting - Yi Pan (Data Infrastructure) On Sept. 9, 2016, 1:34 a.m., Hai Lu wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/51142/ > ----------------------------------------------------------- > > (Updated Sept. 9, 2016, 1:34 a.m.) > > > Review request for samza, Chris Pettitt, Yi Pan (Data Infrastructure), and > Navina Ramesh. > > > Bugs: SAMZA-967 > https://issues.apache.org/jira/browse/SAMZA-967 > > > Repository: samza > > > Description > ------- > > Add HDFS System Consumer: > > 1. System admin, partitioner > 2. System consumer with metrics > > Design doc can be found here: > https://issues.apache.org/jira/secure/attachment/12824078/HDFSSystemConsumer.pdf > > An overview of the high level architecture: > > The system factory is used by Samza to instantiate SystemConsumer, > SystemProducer, and SystemAdmin for a specific system. The > FileDataSystemFactory can be reused for other file system like sources. > > HDFSSystemAdmin will start a “DirectoryPartitioner” to figure out the set of > HDFS files need to be consumed for this job. The DirectoryPartitioner also > uses “GroupingPattern” to group files into partitions if advanced > partitioning is required. HDFSSystemAdmin will then persist the > “PartitionDescriptor” to HDFS. > > The HDFSSystemConsumer will then pick up the “PartitionDescriptor” from HDFS. > Based on this information as well as the actual assignment of partitions, it > would then know which files to read from. > > The initial implementation of the HDFS system consumer supports only avro > data files. It’s very easy to extend it to a variety of file format by > implementing the FileReader interface. > > > > > +------------------------------------------------------------------------------+ > > | > | > +-----------------+ HDFS > | > | Obtain | > | > | Partition > +------+----------------------^------+---------------------------------^-------+ > > | Description | | | > | > | | | | > | > | +-------------v-------+ | | > Filtering/ | > | | | | +---+ > Grouping +-----+ > | | HDFSAvroFileReader | | | > | > | | | Persist | | > | > | +---------+-----------+ Partition | | > | > | | Description | > +------v--------------+ +----------+----------+ > | | | | > | | | > | +---------+-----------+ | |Directory > Partitioner| | HDFSAvroWriter | > | | IFileReader | | | > | | | > | | | | > +------+--------------+ +----------+----------+ > | +---------+-----------+ | | > | > | | | | > | > | | | | > | > | +---------+-----------+ > +-+----------+--------+ +----------+----------+ > | | | | > | | | > | | HDFSSystemConsumer | | HDFSSystemAdmin > | | HDFSSystemProducer | > +----------> | | > | | | > +---------+-----------+ > +-----------+---------+ +----------+----------+ > | | > | > > +------------------------------------+------------------------------------+ > > | > > > +---------------------------------------+--------------------------------------+ > > | > | > | HDFSSystemFactory > | > | > | > > +------------------------------------------------------------------------------+ > > > Diffs > ----- > > build.gradle 1d4eb74b1294318db8454631ddd0901596121ab2 > gradle/dependency-versions.gradle 47c71bfde027835682889407261d4798b629d214 > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/PartitionDescriptionUtil.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/HdfsFileSystemAdapter.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/AvroFileHdfsReader.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/HdfsReaderFactory.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java > PRE-CREATION > > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java > PRE-CREATION > samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala > 61b7570afae3219b618c8830905035063941bdd7 > > samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala > 92eb4472533db67dca01f075cb460581b4bdac0d > > samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala > ef3c20a097ddf2feecaf8b0ad4587ea4bf6570b7 > > samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java > PRE-CREATION > > samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestPartitionDesctiptionUtil.java > PRE-CREATION > > samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestDirectoryPartitioner.java > PRE-CREATION > > samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestHdfsFileSystemAdapter.java > PRE-CREATION > > samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestAvroFileHdfsReader.java > PRE-CREATION > > samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestMultiFileHdfsReader.java > PRE-CREATION > samza-hdfs/src/test/resources/integTest/emptyTestFile PRE-CREATION > samza-hdfs/src/test/resources/partitioner/testfile01 PRE-CREATION > samza-hdfs/src/test/resources/partitioner/testfile02 PRE-CREATION > samza-hdfs/src/test/resources/reader/TestEvent.avsc PRE-CREATION > > samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala > 261310d03de204718621f601117f016da14841df > samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala > 4e328a5f8c2b496a71e36c106339b7af263c96c7 > > Diff: https://reviews.apache.org/r/51142/diff/ > > > Testing > ------- > > unit tests pass. > > manually tested by writing a real hdfs samza job and deploying to a yarn > cluster. > > > Thanks, > > Hai Lu > >