FYI the Dataflow runner automatically sets the default number of shards
(I believe to be 2 * num_workers). Probably we should do something
similar for the Flink runner.
This needs to be done by the runner, as # of workers is a runner
concept; the SDK itself has no concept of workers.
On Thu, Oct 25, 2018 at 3:28 AM Jozef Vilcek <jozo.vil...@gmail.com
<mailto:jozo.vil...@gmail.com>> wrote:
If I do not specify shards for unbounded collection, I get
Caused by: java.lang.IllegalArgumentException: When applying
WriteFiles to an unbounded PCollection, must specify number of
output shards explicitly
at
org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
at
org.apache.beam.sdk.io.WriteFiles.expand(WriteFiles.java:289)
Around same lines in WriteFiles is also a check for windowed writes.
I believe FileIO enables it explicitly when windowing is present. In
filesystem written files are per window and shard.
On Thu, Oct 25, 2018 at 12:01 PM Maximilian Michels <m...@apache.org
<mailto:m...@apache.org>> wrote:
I agree it would be nice to keep the current distribution of
elements
instead of doing a shuffle based on an artificial shard key.
Have you tried `withWindowedWrites()`? Also, why do you say you
need to
specify the number of shards in streaming mode?
-Max
On 25.10.18 10:12, Jozef Vilcek wrote:
> Hm, yes, this makes sense now, but what can be done for my
case? I do
> not want to end up with too many files on disk.
>
> I think what I am looking for is to instruct IO that do not
do again
> random shard and reshuffle but just assume number of shards
equal to
> number of workers and shard ID is a worker ID.
> Is this doable in beam model?
>
> On Wed, Oct 24, 2018 at 4:07 PM Maximilian Michels
<m...@apache.org <mailto:m...@apache.org>
> <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
>
> The FlinkRunner uses a hash function (MurmurHash) on each
key which
> places keys somewhere in the hash space. The hash space
(2^32) is split
> among the partitions (5 in your case). Given enough keys,
the chance
> increases they are equally spread.
>
> This should be similar to what the other Runners do.
>
> On 24.10.18 10:58, Jozef Vilcek wrote:
> >
> > So if I run 5 workers with 50 shards, I end up with:
> >
> > DurationBytes receivedRecords received
> > 2m 39s 900 MB 465,525
> > 2m 39s 1.76 GB 930,720
> > 2m 39s 789 MB 407,315
> > 2m 39s 1.32 GB 698,262
> > 2m 39s 788 MB 407,310
> >
> > Still not good but better than with 5 shards where
some workers
> did not
> > participate at all.
> > So, problem is in some layer which distributes keys /
shards
> among workers?
> >
> > On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax
<re...@google.com <mailto:re...@google.com>
> <mailto:re...@google.com <mailto:re...@google.com>>
> > <mailto:re...@google.com <mailto:re...@google.com>
<mailto:re...@google.com <mailto:re...@google.com>>>> wrote:
> >
> > withNumShards(5) generates 5 random shards. It
turns out that
> > statistically when you generate 5 random shards
and you have 5
> > works, the probability is reasonably high that
some workers
> will get
> > more than one shard (and as a result not all
workers will
> > participate). Are you able to set the number of
shards larger
> than 5?
> >
> > On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek
> <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>
<mailto:jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>>
> > <mailto:jozo.vil...@gmail.com
<mailto:jozo.vil...@gmail.com>
> <mailto:jozo.vil...@gmail.com
<mailto:jozo.vil...@gmail.com>>>> wrote:
> >
> > cc (dev)
> >
> > I tried to run the example with FlinkRunner in
batch mode and
> > received again bad data spread among the workers.
> >
> > When I tried to remove number of shards for
batch mode in
> above
> > example, pipeline crashed before launch
> >
> > Caused by: java.lang.IllegalStateException:
Inputs to Flatten
> > had incompatible triggers:
> >
>
AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
> > entCountAtLeast(10000)),
> >
>
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
> > hour)))),
> >
>
AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
> > rever(AfterPane.elementCountAtLeast(1)),
> >
>
Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
> >
> >
> >
> >
> >
> > On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek
> > <jozo.vil...@gmail.com
<mailto:jozo.vil...@gmail.com> <mailto:jozo.vil...@gmail.com
<mailto:jozo.vil...@gmail.com>>
> <mailto:jozo.vil...@gmail.com
<mailto:jozo.vil...@gmail.com> <mailto:jozo.vil...@gmail.com
<mailto:jozo.vil...@gmail.com>>>> wrote:
> >
> > Hi Max,
> >
> > I forgot to mention that example is run in
streaming
> mode,
> > therefore I can not do writes without
specifying shards.
> > FileIO explicitly asks for them.
> >
> > I am not sure where the problem is.
FlinkRunner is
> only one
> > I used.
> >
> > On Tue, Oct 23, 2018 at 11:27 AM
Maximilian Michels
> > <m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
> >
> > Hi Jozef,
> >
> > This does not look like a FlinkRunner
related
> problem,
> > but is caused by
> > the `WriteFiles` sharding logic. It
assigns keys and
> > does a Reshuffle
> > which apparently does not lead to good
data spread in
> > your case.
> >
> > Do you see the same behavior without
> `withNumShards(5)`?
> >
> > Thanks,
> > Max
> >
> > On 22.10.18 11:57, Jozef Vilcek wrote:
> > > Hello,
> > >
> > > I am having some trouble to get a
balanced
> write via
> > FileIO. Workers at
> > > the shuffle side where data per
window fire are
> > written to the
> > > filesystem receive unbalanced
number of events.
> > >
> > > Here is a naive code example:
> > >
> > > val read = KafkaIO.read()
> > > .withTopic("topic")
> > >
.withBootstrapServers("kafka1:9092")
> > >
> >
.withKeyDeserializer(classOf[ByteArrayDeserializer])
> > >
> >
> .withValueDeserializer(classOf[ByteArrayDeserializer])
> > > .withProcessingTime()
> > >
> > > pipeline
> > > .apply(read)
> > > .apply(MapElements.via(new
> > > SimpleFunction[KafkaRecord[Array[Byte],
> Array[Byte]],
> > String]() {
> > > override def apply(input:
> > KafkaRecord[Array[Byte],
> > > Array[Byte]]): String = {
> > > new
String(input.getKV.getValue,
> "UTF-8")
> > > }
> > > }))
> > >
> > >
> > >
> >
>
.apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
> > >
> .triggering(AfterWatermark.pastEndOfWindow()
> > >
> >
> .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
> > >
> >
> .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
> > >
> >
> Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
> > >
> > >
> >
>
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
> > > .discardingFiredPanes()
> > >
> >
.withAllowedLateness(Duration.standardDays(7)))
> > >
> > > .apply(FileIO.write()
> > > .via(TextIO.sink())
> > > .withNaming(new
> > SafeFileNaming(outputPath, ".txt"))
> > >
.withTempDirectory(tempLocation)
> > > .withNumShards(5))
> > >
> > >
> > > If I run this on Beam 2.6.0 with
Flink 1.5.0 on 5
> > workers (equal to
> > > number of shards), I would expect
that each worker
> > will participate on
> > > persisting shards and equally,
since code uses
> fixed
> > number of shards
> > > (and random shard assign?). But
reality is
> different
> > (see 2 attachements
> > > - statistiscs from flink task
reading from
> kafka and
> > task writing to files)
> > >
> > > What am I missing? How to achieve
balanced writes?
> > >
> > > Thanks,
> > > Jozef
> > >
> > >
> >
>