Repository: incubator-apex-core Updated Branches: refs/heads/release-3.1 e568f7918 -> 624ffe60d
APEX-121 #resolve Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/07312f53 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/07312f53 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/07312f53 Branch: refs/heads/release-3.1 Commit: 07312f534977a1571adf5223da522cafb8a8ec84 Parents: 77a86ac Author: Gaurav <[email protected]> Authored: Sat Sep 12 22:50:11 2015 -0700 Committer: Gaurav <[email protected]> Committed: Tue Sep 29 09:19:09 2015 -0700 ---------------------------------------------------------------------- .../com/datatorrent/stram/stream/BufferServerPublisher.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/07312f53/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java index d8e09c8..39cf667 100644 --- a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java +++ b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java @@ -112,7 +112,14 @@ public class BufferServerPublisher extends Publisher implements ByteCounterStrea * if there is any state write that for the subscriber before we write the data. */ if (dsp.state != null) { - write(DataTuple.getSerializedTuple(MessageType.CODEC_STATE_VALUE, dsp.state)); + array = DataTuple.getSerializedTuple(MessageType.CODEC_STATE_VALUE, dsp.state); + try { + while (!write(array)) { + sleep(5); + } + } catch (InterruptedException ie) { + throw new RuntimeException(ie); + } } /* * Now that the state if any has been sent, we can proceed with the actual data we want to send.
