[
https://issues.apache.org/jira/browse/BEAM-1177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15758852#comment-15758852
]
Amit Sela commented on BEAM-1177:
---------------------------------
Instead of simply emitting {{Iterable<byte[]>}} per partition, I'll emit
{{Tuple2<Iterable<byte[]>, Metadata>}} to be able to report read count and
watermark per batch.
{code}
class Metadata {
private final long numRecords;
private final Instant watermark;
public Metadata(long numRecords, Instant watermark) {
this.numRecords = numRecords;
this.watermark = watermark;
}
public long getNumRecords() {
return numRecords;
}
public Instant getWatermark() {
return watermark;
}
}
{code}
> Input DStream "bundles" should be in serialized form and include relevant
> metadata.
> -----------------------------------------------------------------------------------
>
> Key: BEAM-1177
> URL: https://issues.apache.org/jira/browse/BEAM-1177
> Project: Beam
> Issue Type: Bug
> Components: runner-spark
> Reporter: Amit Sela
> Assignee: Amit Sela
>
> Currently, the input partitions hold "bundles" of read elements within the
> {{mapWithStateDStream}} used for the read.
> Since this is automatically shuffled, user-data (the read elements) should be
> serialized using coders to avoid breaking (if user-data is not {{Kryo}}
> serializable).
> Even after BEAM-848 would complete, the resulting {{MapWithStateDStream}}
> would be checkpointed periodically and so it would still have to remain in
> serialized form.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)