GitHub user jkff opened a pull request:

    https://github.com/apache/beam/pull/3023

    [BEAM-2052] Allow dynamic sharding in windowed file sinks

    This is a slightly modified and rearranged version of @reuvenlax 's #2647 .
    
    My concerns about it are:
    
    1) In direct runner, the integration tests of dynamic sharding are vacuous, 
because direct runner replaces unspecified sharding with fixed sharding at 
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
 (applied at 
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java#L217).
 However, this is a testing-only concern: other runners don't have this 
override, so overall the testing is non-vacuous, this is just hard to test 
against direct runner and I suspect that we probably want these tests to be 
non-vacuous in direct runner too.
    
    2) When I removed that override for testing purposes, I noticed that 
there's a very large number of files being written - primarily, I guess, 
because the bundles are very small. So large a number of files that the test 
time for batch with dynamic sharding grows from 21 seconds to 5 minutes. In 
particular, we write many, many files for each window/pane - presumably because 
in streaming runners and in direct runner, there's at least 1 bundle per key, 
and we create at least 1 file per bundle in 
WriteFiles.Write(Windowed,Unwindowed)Bundles.
    
    Reuven, can you please comment on whether this "at least 1 file per key" is 
expected behavior in a streaming runner? I suspect that it's not, but then I'm 
not sure how to fix the PR semantically.
    
    CC: @reuvenlax @davorbonaci @dhalperi 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/jkff/incubator-beam finish-pr-2647-2

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/beam/pull/3023.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3023
    
----
commit c03781cbbba1d80e1ad5c93165bdad6bebd05c53
Author: Reuven Lax <[email protected]>
Date:   2017-04-05T19:13:44Z

    Implement dynamic-sharding for windowed file outputs, and add an 
integration test.

commit c43cc4abe7ef81a7a9155ac686eed46af24eb7c0
Author: Reuven Lax <[email protected]>
Date:   2017-05-09T20:02:12Z

    Renames FileBasedSink inner classes
    
    FileBasedWriteOperation -> WriteOperation, FileBasedWriter -> Writer

commit 3347c6e49725a3648bd944b9543425518e2f77e1
Author: Eugene Kirpichov <[email protected]>
Date:   2017-05-09T22:10:07Z

    Simpler code for setting shard numbers on results in FileBasedSink

commit b775df16594d30538c2b5b0af0d17a179060960c
Author: Eugene Kirpichov <[email protected]>
Date:   2017-05-09T22:25:57Z

    Splits WriteBundles into windowed/unwindowed versions

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to