-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51142/#review148612
-----------------------------------------------------------



Still in the middle but don't want to lose what I had up to now.


samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java 
(line 47)
<https://reviews.apache.org/r/51142/#comment216064>

    nit: when you refer to the class names, it would be better to use {@link 
HdfsSystemAdmin} {@link HdfsSystemConsumer} etc.



samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java 
(line 84)
<https://reviews.apache.org/r/51142/#comment216066>

    PartitionDescriptor or PartitionDescription? I saw both used in the 
high-level designs and the code. It would better to choose one. It seems that 
PartitionDescriptor is what you intended?



samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java 
(line 86)
<https://reviews.apache.org/r/51142/#comment216067>

    So, the PartitionDescriptor is immutable? Better to make a note here or in 
javadoc of this variable.



samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java 
(line 91)
<https://reviews.apache.org/r/51142/#comment216075>

    You can do:
    try(FSDataOutputStream fos = fs.create(targetPath)) {
      fos.write(PartitionDescriptionUtil.toJson();
      }



samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java 
(line 105)
<https://reviews.apache.org/r/51142/#comment216080>

    What if the PartitionDescriptor already exists? Could it be the case that 
the systemStreamMetadata maintains a different copy of PartitionDescriptor? It 
is not clear to me which one is the source of truth? 
directoryPartitioner.getPartitionMetadataMap()? Or 
directoryPartitioner.getPartitionDescriptor()? Maybe I miss some basic 
information regarding to the concept on PartitionDescriptor vs 
PartitionMetadataMap?



samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java 
(line 130)
<https://reviews.apache.org/r/51142/#comment216083>

    It would be nice to put an example offset string here to illustrate what we 
are comparing



samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java 
(line 50)
<https://reviews.apache.org/r/51142/#comment216085>

    Would be nice to add some Javadoc for this class.



samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java 
(line 59)
<https://reviews.apache.org/r/51142/#comment216086>

    I would recommend to add some javadoc here to describe what's in the 
partitionDescriptionMap. This is one of the key concept in the design and would 
be nice to have Javadoc together w/ the code.



samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java 
(line 60)
<https://reviews.apache.org/r/51142/#comment216087>

    So, I assume that you would have one reader per partition? It would be nice 
to add into the javadoc here as well. Also, why the readers are only keyed by 
Partition, not the stream names?



samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java 
(line 61)
<https://reviews.apache.org/r/51142/#comment216096>

    nit: add javadoc here to explain what's the usage of isShutdown.



samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java 
(line 64)
<https://reviews.apache.org/r/51142/#comment216088>

    What's the usage of this one, for metrics? Can we use metrics directly? 
Check KafkaSystemConsumerMetrics for a reference implementation.



samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java 
(line 69)
<https://reviews.apache.org/r/51142/#comment216089>

    I would prefer to follow the same pattern as KafkaSystemConsumer, i.e. 
passing in the HdfsSystemConsumerMetrics object instead of the MetricsRegistry. 
Please check the code in KafkaSystemFactory.getConsumer() to see how the 
metrics object are created and passed along.



samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java 
(line 75)
<https://reviews.apache.org/r/51142/#comment216092>

    If you have reason not to use ConcurrentHashMap for readers and 
partitionDescriptionMap, please state so here. After the multi-thread change, 
we must make sure all new SystemConsumer and SystemProducer are thread-safe.



samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java 
(line 79)
<https://reviews.apache.org/r/51142/#comment216090>

    These should all be encapsulated in HdfsSystemConsumerMetrics object.



samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java 
(line 101)
<https://reviews.apache.org/r/51142/#comment216094>

    Better, to avoid the wasteful remote IO if there are multiple calls to 
getPartitionDescriptor from multiple threads, is to use bucketized locks on the 
ConcurrentHashMap entries to ensure synchronization in populating a certain 
hash map entry. Guava cache implemented the bucketized locking as a built-in 
method already: http://www.tutorialspoint.com/guava/guava_caching_utilities.htm



samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java 
(line 118)
<https://reviews.apache.org/r/51142/#comment216093>

    It would be better to use putIfAbsent() to make sure the 
partitionDescriptionMap is thread safe.



samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java 
(line 142)
<https://reviews.apache.org/r/51142/#comment216098>

    Isn't it clearer to have one loop like below instead of two embedded loops:
    while (!isShutdown) {
      if (!reader.hasNext()) {
        break;
      }
      IncomingMessageEnvelope messageEnvelope = reader.readNext();
      try {
         super.put()
         ...
      } catch () {
         ...
      }
    }


- Yi Pan (Data Infrastructure)


On Sept. 9, 2016, 1:34 a.m., Hai Lu wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51142/
> -----------------------------------------------------------
> 
> (Updated Sept. 9, 2016, 1:34 a.m.)
> 
> 
> Review request for samza, Chris Pettitt, Yi Pan (Data Infrastructure), and 
> Navina Ramesh.
> 
> 
> Bugs: SAMZA-967
>     https://issues.apache.org/jira/browse/SAMZA-967
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Add HDFS System Consumer: 
> 
> 1. System admin, partitioner
> 2. System consumer with metrics
> 
> Design doc can be found here: 
> https://issues.apache.org/jira/secure/attachment/12824078/HDFSSystemConsumer.pdf
> 
> An overview of the high level architecture: 
> 
> The system factory is used by Samza to instantiate SystemConsumer, 
> SystemProducer, and SystemAdmin for a specific system. The 
> FileDataSystemFactory can be reused for other file system like sources. 
> 
> HDFSSystemAdmin will start a “DirectoryPartitioner” to figure out the set of 
> HDFS files need to be consumed for this job. The DirectoryPartitioner also 
> uses “GroupingPattern” to group files into partitions if advanced 
> partitioning is required. HDFSSystemAdmin will then persist the 
> “PartitionDescriptor” to HDFS.
> 
> The HDFSSystemConsumer will then pick up the “PartitionDescriptor” from HDFS. 
> Based on this information as well as the actual assignment of partitions, it 
> would then know which files to read from.
> 
> The initial implementation of the HDFS system consumer supports only avro 
> data files. It’s very easy to extend it to a variety of file format by 
> implementing the FileReader interface.
> 
>                                                                               
>                                         
>                              
> +------------------------------------------------------------------------------+
>          
>                              |                                                
>                               |         
>            +-----------------+                                     HDFS       
>                               |         
>            |   Obtain        |                                                
>                               |         
>            |  Partition      
> +------+----------------------^------+---------------------------------^-------+
>          
>            | Description            |                      |      |           
>                       |                 
>            |                        |                      |      |           
>                       |                 
>            |          +-------------v-------+              |      |       
> Filtering/                |                 
>            |          |                     |              |      +---+    
> Grouping                 +-----+           
>            |          | HDFSAvroFileReader  |              |          |       
>                             |           
>            |          |                     |    Persist   |          |       
>                             |           
>            |          +---------+-----------+   Partition  |          |       
>                             |           
>            |                    |              Description |   
> +------v--------------+         +----------+----------+
>            |                    |                          |   |              
>        |         |                     |
>            |          +---------+-----------+              |   |Directory 
> Partitioner|         |   HDFSAvroWriter    |
>            |          |     IFileReader     |              |   |              
>        |         |                     |
>            |          |                     |              |   
> +------+--------------+         +----------+----------+
>            |          +---------+-----------+              |          |       
>                             |           
>            |                    |                          |          |       
>                             |           
>            |                    |                          |          |       
>                             |           
>            |          +---------+-----------+            
> +-+----------+--------+               +----------+----------+
>            |          |                     |            |                    
>  |               |                     |
>            |          | HDFSSystemConsumer  |            |   HDFSSystemAdmin  
>  |               | HDFSSystemProducer  |
>            +---------->                     |            |                    
>  |               |                     |
>                       +---------+-----------+            
> +-----------+---------+               +----------+----------+
>                                 |                                    |        
>                             |           
>                                 
> +------------------------------------+------------------------------------+   
>         
>                                                                      |        
>                                         
>                              
> +---------------------------------------+--------------------------------------+
>          
>                              |                                                
>                               |         
>                              |                              HDFSSystemFactory 
>                               |         
>                              |                                                
>                               |         
>                              
> +------------------------------------------------------------------------------+
> 
> 
> Diffs
> -----
> 
>   build.gradle 1d4eb74b1294318db8454631ddd0901596121ab2 
>   gradle/dependency-versions.gradle 47c71bfde027835682889407261d4798b629d214 
>   samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java 
> PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java 
> PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/PartitionDescriptionUtil.java
>  PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java
>  PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java
>  PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/HdfsFileSystemAdapter.java
>  PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/AvroFileHdfsReader.java
>  PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/HdfsReaderFactory.java
>  PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java
>  PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java
>  PRE-CREATION 
>   samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala 
> 61b7570afae3219b618c8830905035063941bdd7 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala 
> 92eb4472533db67dca01f075cb460581b4bdac0d 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
>  ef3c20a097ddf2feecaf8b0ad4587ea4bf6570b7 
>   
> samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java
>  PRE-CREATION 
>   
> samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestPartitionDesctiptionUtil.java
>  PRE-CREATION 
>   
> samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestDirectoryPartitioner.java
>  PRE-CREATION 
>   
> samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestHdfsFileSystemAdapter.java
>  PRE-CREATION 
>   
> samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestAvroFileHdfsReader.java
>  PRE-CREATION 
>   
> samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestMultiFileHdfsReader.java
>  PRE-CREATION 
>   samza-hdfs/src/test/resources/integTest/emptyTestFile PRE-CREATION 
>   samza-hdfs/src/test/resources/partitioner/testfile01 PRE-CREATION 
>   samza-hdfs/src/test/resources/partitioner/testfile02 PRE-CREATION 
>   samza-hdfs/src/test/resources/reader/TestEvent.avsc PRE-CREATION 
>   
> samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
>  261310d03de204718621f601117f016da14841df 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala 
> 4e328a5f8c2b496a71e36c106339b7af263c96c7 
> 
> Diff: https://reviews.apache.org/r/51142/diff/
> 
> 
> Testing
> -------
> 
> unit tests pass.
> 
> manually tested by writing a real hdfs samza job and deploying to a yarn 
> cluster.
> 
> 
> Thanks,
> 
> Hai Lu
> 
>

Reply via email to