GitHub user jkff opened a pull request:
https://github.com/apache/beam/pull/3023
[BEAM-2052] Allow dynamic sharding in windowed file sinks
This is a slightly modified and rearranged version of @reuvenlax 's #2647 .
My concerns about it are:
1) In direct runner, the integration tests of dynamic sharding are vacuous,
because direct runner replaces unspecified sharding with fixed sharding at
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
(applied at
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java#L217).
However, this is a testing-only concern: other runners don't have this
override, so overall the testing is non-vacuous, this is just hard to test
against direct runner and I suspect that we probably want these tests to be
non-vacuous in direct runner too.
2) When I removed that override for testing purposes, I noticed that
there's a very large number of files being written - primarily, I guess,
because the bundles are very small. So large a number of files that the test
time for batch with dynamic sharding grows from 21 seconds to 5 minutes. In
particular, we write many, many files for each window/pane - presumably because
in streaming runners and in direct runner, there's at least 1 bundle per key,
and we create at least 1 file per bundle in
WriteFiles.Write(Windowed,Unwindowed)Bundles.
Reuven, can you please comment on whether this "at least 1 file per key" is
expected behavior in a streaming runner? I suspect that it's not, but then I'm
not sure how to fix the PR semantically.
CC: @reuvenlax @davorbonaci @dhalperi
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/jkff/incubator-beam finish-pr-2647-2
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/beam/pull/3023.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3023
----
commit c03781cbbba1d80e1ad5c93165bdad6bebd05c53
Author: Reuven Lax <[email protected]>
Date: 2017-04-05T19:13:44Z
Implement dynamic-sharding for windowed file outputs, and add an
integration test.
commit c43cc4abe7ef81a7a9155ac686eed46af24eb7c0
Author: Reuven Lax <[email protected]>
Date: 2017-05-09T20:02:12Z
Renames FileBasedSink inner classes
FileBasedWriteOperation -> WriteOperation, FileBasedWriter -> Writer
commit 3347c6e49725a3648bd944b9543425518e2f77e1
Author: Eugene Kirpichov <[email protected]>
Date: 2017-05-09T22:10:07Z
Simpler code for setting shard numbers on results in FileBasedSink
commit b775df16594d30538c2b5b0af0d17a179060960c
Author: Eugene Kirpichov <[email protected]>
Date: 2017-05-09T22:25:57Z
Splits WriteBundles into windowed/unwindowed versions
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---