[ 
https://issues.apache.org/jira/browse/FLINK-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16362278#comment-16362278
 ] 

ASF GitHub Bot commented on FLINK-3655:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5415#discussion_r167850774
  
    --- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
 ---
    @@ -428,6 +431,78 @@ public void testDelimiterOnBufferBoundary() throws 
IOException {
                format.close();
        }
     
    +   // -- Statistics --//
    +
    +   @Test
    +   public void testGetStatistics() throws IOException {
    +           final String myString = "my mocked line 1\nmy mocked line 2\n";
    +           final long size = myString.length();
    +           final Path filePath = createTempFilePath(myString);
    +
    +           final String myString2 = "my mocked line 1\nmy mocked line 
2\nanother mocked line3\n";
    +           final long size2 = myString2.length();
    +           final Path filePath2 = createTempFilePath(myString2);
    +
    +           final long totalSize = size + size2;
    +
    +           DelimitedInputFormat<String> format = new MyTextInputFormat();
    +           format.setFilePaths(filePath.toUri().toString(), 
filePath2.toUri().toString());
    +
    +           FileInputFormat.FileBaseStatistics stats = 
format.getStatistics(null);
    +           assertNotNull(stats);
    +           assertEquals("The file size from the statistics is wrong.", 
totalSize, stats.getTotalInputSize());
    +   }
    +   
    +   @Test
    +   public void testGetStatisticsFileDoesNotExist() throws IOException {
    +           DelimitedInputFormat<String> format = new MyTextInputFormat();
    +           format.setFilePaths("file:///path/does/not/really/exist", 
"file:///another/path/that/does/not/exist");
    +
    +           FileBaseStatistics stats = format.getStatistics(null);
    +           assertNull("The file statistics should be null.", stats);
    +   }
    +
    +   @Test
    +   public void testGetStatisticsSingleFileWithCachedVersion() throws 
IOException {
    +           final String myString = "my mocked line 1\nmy mocked line 2\n";
    +           final Path tempFile = createTempFilePath(myString);
    +           final long size = myString.length();
    +           final long cachedSize = 10065;
    +
    +           DelimitedInputFormat<String> format = new MyTextInputFormat();
    +           format.setFilePath(tempFile);
    +           format.configure(new Configuration());
    +
    +           FileBaseStatistics stats = format.getStatistics(null);
    +           assertNotNull(stats);
    +           assertEquals("The file size from the statistics is wrong.", 
size, stats.getTotalInputSize());
    +           
    +           format = new MyTextInputFormat();
    +           format.setFilePath(tempFile);
    +           format.configure(new Configuration());
    +           
    +           FileBaseStatistics newStats = format.getStatistics(stats);
    +           assertEquals("Statistics object was changed.", newStats, stats);
    +           
    +           // insert fake stats with the correct modification time. the 
call should return the fake stats
    +           format = new MyTextInputFormat();
    +           format.setFilePath(tempFile);
    +           format.configure(new Configuration());
    +           
    +           FileBaseStatistics fakeStats = new 
FileBaseStatistics(stats.getLastModificationTime(), cachedSize, 
BaseStatistics.AVG_RECORD_BYTES_UNKNOWN);
    +           BaseStatistics latest = format.getStatistics(fakeStats);
    +           assertEquals("The file size from the statistics is wrong.", 
cachedSize, latest.getTotalInputSize());
    +           
    +           // insert fake stats with the expired modification time. the 
call should return new accurate stats
    +           format = new MyTextInputFormat();
    +           format.setFilePath(tempFile);
    +           format.configure(new Configuration());
    +           
    +           FileBaseStatistics outDatedFakeStats = new 
FileBaseStatistics(stats.getLastModificationTime()-1, cachedSize, 
BaseStatistics.AVG_RECORD_BYTES_UNKNOWN);
    --- End diff --
    
    missing spaces around `-`


> Allow comma-separated or multiple directories to be specified for 
> FileInputFormat
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-3655
>                 URL: https://issues.apache.org/jira/browse/FLINK-3655
>             Project: Flink
>          Issue Type: Improvement
>          Components: Core
>    Affects Versions: 1.0.0
>            Reporter: Gna Phetsarath
>            Assignee: Fabian Hueske
>            Priority: Major
>              Labels: starter
>             Fix For: 1.5.0
>
>
> Allow comma-separated or multiple directories to be specified for 
> FileInputFormat so that a DataSource will process the directories 
> sequentially.
>    
> env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*")
> in Scala
>    env.readFile(paths: Seq[String])
> or 
>   env.readFile(path: String, otherPaths: String*)
> Wildcard support would be a bonus.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to