-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51142/#review148780
-----------------------------------------------------------



Thanks for pulling it off! Two high-level comments: a) I would prefer to stick 
w/ the same pattern for metric initialization in Samza (i.e. use 
MetricsRegistry to create a metric object extending MetricsHelper and pass it 
into the constructor for SystemConsumer); b) I don't fully understand the use 
case that we need to use partial-ordered offset vector in the same partition as 
implemented in MultiFileHdfsReader. Let's discuss in person.


samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java
 (line 50)
<https://reviews.apache.org/r/51142/#comment216284>

    It would be better to wrap this into the logic inside PartitionDescriptor 
class. Hence, no need to expose and leak out the delimiter used inside the 
PartitionDescriptor. Similar comments on offsets.



samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java
 (line 54)
<https://reviews.apache.org/r/51142/#comment216350>

    It would be nicer to guarantee some sementic ordering among the files in 
the same partition, when persisting the partition descriptor.



samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java
 (line 59)
<https://reviews.apache.org/r/51142/#comment216351>

    Not sure what are we doing here? What's the ordering that we are enforcing 
in this multi-file partition? I saw that you are trying to make the offsets as 
an offset vector on top of all files in the same partition. Why? Can we 
simplify it by making it full-ordered in the same partition instead of 
partial-ordered via an offset vector?



samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java
 (line 75)
<https://reviews.apache.org/r/51142/#comment216352>

    Not sure why do we want to use the offset vector???



samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java
 (line 90)
<https://reviews.apache.org/r/51142/#comment216353>

    Again, why??? It is hard to understand without a concrete use case that 
requires a partial ordering of offsets in the same partition?



samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala (line 
66)
<https://reviews.apache.org/r/51142/#comment216354>

    It would be nicer to make it conforming to Offspring style of config 
variable scoping. i.e. if the scope of configuration is for hdfs consumer, use 
systems.%s.consumer.hdfs.buffer-capacity. I would suggest to consult Prateek 
since he has been working on the Offspring config refactoring. For new config 
variables, "." should strictly be used as deliminator between scopes, not as 
deliminator between words.



samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala (line 
78)
<https://reviews.apache.org/r/51142/#comment216355>

    Same here. systems.%s.partitioner.group-pattern.
    
    Also, does this partitioner apply to other type of systems as well? Or just 
for HDFS? Won't it be better to name it systems.%s.hdfs-partitioner.* ?



samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala (line 
86)
<https://reviews.apache.org/r/51142/#comment216356>

    This is the redundant definition of a config variable from YarnConfig 
object. What's the intended relationship between HdfsConfig and YarnConfig? 
Besides, yarn.job.staging.directory is the metadata config path for the whole 
job. I assume that HdfsConfig is the metadata config path for a specific 
SystemAdmin? I would recommend that we have a per system config here. It may be 
default to YarnConfig.getJobStagingDirectory().



samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala 
(line 32)
<https://reviews.apache.org/r/51142/#comment216357>

    Why can't we create the HdfsSystemConsumerMetrics here and just passing in 
the HdfsSystemConsumerMetrics as we do w/ HdfsSystemProducer? Which use case 
makes it necessary to pass in the MetricsRegistry directly?



samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala 
(line 36)
<https://reviews.apache.org/r/51142/#comment216358>

    Not related to your RB, but could you open a JIRA for this one? Using 
KafkaUtil class in HdfsSystemFactory seems really weird.



samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestDirectoryPartitioner.java
 (line 183)
<https://reviews.apache.org/r/51142/#comment216360>

    It would be better to ensure that the index used in each partition is 
referring to the sorted list from inputFiles, to guarantee the consistent 
ordering/indexing of the same file in the list of input files. Is it possible 
that a new file is inserted in the middle of the sorted list? Maybe sort by 
create time to ensure the files are always appended to the greatest index in 
the sorted list?



samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
 (line 127)
<https://reviews.apache.org/r/51142/#comment216361>

    nit: unnecessary re-formatting



samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
 (line 145)
<https://reviews.apache.org/r/51142/#comment216362>

    nit: unnecessary re-formatting



samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
 (line 165)
<https://reviews.apache.org/r/51142/#comment216363>

    nit: unnecessary re-formatting



samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
 (line 183)
<https://reviews.apache.org/r/51142/#comment216364>

    nit: unnecessary re-formatting



samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
 (line 204)
<https://reviews.apache.org/r/51142/#comment216365>

    nit: unnecessary re-formatting



samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
 (line 222)
<https://reviews.apache.org/r/51142/#comment216366>

    nit: unnecessary re-formatting



samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
 (line 242)
<https://reviews.apache.org/r/51142/#comment216367>

    nit: unnecessary re-formatting



samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
 (line 260)
<https://reviews.apache.org/r/51142/#comment216368>

    nit: unnecessary re-formatting



samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
 (line 320)
<https://reviews.apache.org/r/51142/#comment216369>

    nit: unnecessary re-formatting



samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
 (line 338)
<https://reviews.apache.org/r/51142/#comment216370>

    nit: unnecessary re-formatting



samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
 (line 349)
<https://reviews.apache.org/r/51142/#comment216371>

    nit: unnecessary re-formatting


- Yi Pan (Data Infrastructure)


On Sept. 9, 2016, 1:34 a.m., Hai Lu wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51142/
> -----------------------------------------------------------
> 
> (Updated Sept. 9, 2016, 1:34 a.m.)
> 
> 
> Review request for samza, Chris Pettitt, Yi Pan (Data Infrastructure), and 
> Navina Ramesh.
> 
> 
> Bugs: SAMZA-967
>     https://issues.apache.org/jira/browse/SAMZA-967
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Add HDFS System Consumer: 
> 
> 1. System admin, partitioner
> 2. System consumer with metrics
> 
> Design doc can be found here: 
> https://issues.apache.org/jira/secure/attachment/12824078/HDFSSystemConsumer.pdf
> 
> An overview of the high level architecture: 
> 
> The system factory is used by Samza to instantiate SystemConsumer, 
> SystemProducer, and SystemAdmin for a specific system. The 
> FileDataSystemFactory can be reused for other file system like sources. 
> 
> HDFSSystemAdmin will start a “DirectoryPartitioner” to figure out the set of 
> HDFS files need to be consumed for this job. The DirectoryPartitioner also 
> uses “GroupingPattern” to group files into partitions if advanced 
> partitioning is required. HDFSSystemAdmin will then persist the 
> “PartitionDescriptor” to HDFS.
> 
> The HDFSSystemConsumer will then pick up the “PartitionDescriptor” from HDFS. 
> Based on this information as well as the actual assignment of partitions, it 
> would then know which files to read from.
> 
> The initial implementation of the HDFS system consumer supports only avro 
> data files. It’s very easy to extend it to a variety of file format by 
> implementing the FileReader interface.
> 
>                                                                               
>                                         
>                              
> +------------------------------------------------------------------------------+
>          
>                              |                                                
>                               |         
>            +-----------------+                                     HDFS       
>                               |         
>            |   Obtain        |                                                
>                               |         
>            |  Partition      
> +------+----------------------^------+---------------------------------^-------+
>          
>            | Description            |                      |      |           
>                       |                 
>            |                        |                      |      |           
>                       |                 
>            |          +-------------v-------+              |      |       
> Filtering/                |                 
>            |          |                     |              |      +---+    
> Grouping                 +-----+           
>            |          | HDFSAvroFileReader  |              |          |       
>                             |           
>            |          |                     |    Persist   |          |       
>                             |           
>            |          +---------+-----------+   Partition  |          |       
>                             |           
>            |                    |              Description |   
> +------v--------------+         +----------+----------+
>            |                    |                          |   |              
>        |         |                     |
>            |          +---------+-----------+              |   |Directory 
> Partitioner|         |   HDFSAvroWriter    |
>            |          |     IFileReader     |              |   |              
>        |         |                     |
>            |          |                     |              |   
> +------+--------------+         +----------+----------+
>            |          +---------+-----------+              |          |       
>                             |           
>            |                    |                          |          |       
>                             |           
>            |                    |                          |          |       
>                             |           
>            |          +---------+-----------+            
> +-+----------+--------+               +----------+----------+
>            |          |                     |            |                    
>  |               |                     |
>            |          | HDFSSystemConsumer  |            |   HDFSSystemAdmin  
>  |               | HDFSSystemProducer  |
>            +---------->                     |            |                    
>  |               |                     |
>                       +---------+-----------+            
> +-----------+---------+               +----------+----------+
>                                 |                                    |        
>                             |           
>                                 
> +------------------------------------+------------------------------------+   
>         
>                                                                      |        
>                                         
>                              
> +---------------------------------------+--------------------------------------+
>          
>                              |                                                
>                               |         
>                              |                              HDFSSystemFactory 
>                               |         
>                              |                                                
>                               |         
>                              
> +------------------------------------------------------------------------------+
> 
> 
> Diffs
> -----
> 
>   build.gradle 1d4eb74b1294318db8454631ddd0901596121ab2 
>   gradle/dependency-versions.gradle 47c71bfde027835682889407261d4798b629d214 
>   samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java 
> PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java 
> PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/PartitionDescriptionUtil.java
>  PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java
>  PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java
>  PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/HdfsFileSystemAdapter.java
>  PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/AvroFileHdfsReader.java
>  PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/HdfsReaderFactory.java
>  PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java
>  PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java
>  PRE-CREATION 
>   samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala 
> 61b7570afae3219b618c8830905035063941bdd7 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala 
> 92eb4472533db67dca01f075cb460581b4bdac0d 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
>  ef3c20a097ddf2feecaf8b0ad4587ea4bf6570b7 
>   
> samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java
>  PRE-CREATION 
>   
> samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestPartitionDesctiptionUtil.java
>  PRE-CREATION 
>   
> samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestDirectoryPartitioner.java
>  PRE-CREATION 
>   
> samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestHdfsFileSystemAdapter.java
>  PRE-CREATION 
>   
> samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestAvroFileHdfsReader.java
>  PRE-CREATION 
>   
> samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestMultiFileHdfsReader.java
>  PRE-CREATION 
>   samza-hdfs/src/test/resources/integTest/emptyTestFile PRE-CREATION 
>   samza-hdfs/src/test/resources/partitioner/testfile01 PRE-CREATION 
>   samza-hdfs/src/test/resources/partitioner/testfile02 PRE-CREATION 
>   samza-hdfs/src/test/resources/reader/TestEvent.avsc PRE-CREATION 
>   
> samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
>  261310d03de204718621f601117f016da14841df 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala 
> 4e328a5f8c2b496a71e36c106339b7af263c96c7 
> 
> Diff: https://reviews.apache.org/r/51142/diff/
> 
> 
> Testing
> -------
> 
> unit tests pass.
> 
> manually tested by writing a real hdfs samza job and deploying to a yarn 
> cluster.
> 
> 
> Thanks,
> 
> Hai Lu
> 
>

Reply via email to