Repository: apex-core Updated Branches: refs/heads/master 7106b766a -> 1e3d47bef
APEXCORE-648 Unnecessary byte array copy in DefaultStatefulStreamCodec.toDataStatePair() Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/1e3d47be Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/1e3d47be Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/1e3d47be Branch: refs/heads/master Commit: 1e3d47bef8b32d678b2a726fb2aaf6854f4650b9 Parents: 7106b76 Author: Vlad Rozov <[email protected]> Authored: Mon Feb 20 08:03:11 2017 -0800 Committer: Vlad Rozov <[email protected]> Committed: Wed Feb 22 22:30:18 2017 -0800 ---------------------------------------------------------------------- .../stram/codec/DefaultStatefulStreamCodec.java | 22 ++++----- .../stram/codec/StatefulStreamCodec.java | 6 +-- .../codec/DefaultStatefulStreamCodecTest.java | 50 ++++++++++++-------- 3 files changed, 43 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e3d47be/engine/src/main/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodec.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodec.java b/engine/src/main/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodec.java index 094c2e2..5bb1640 100644 --- a/engine/src/main/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodec.java +++ b/engine/src/main/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodec.java @@ -49,6 +49,7 @@ public class DefaultStatefulStreamCodec<T> extends Kryo implements StatefulStrea private final Output data; private final Output state; private final Input input; + private final DataStatePair dataStatePair; @SuppressWarnings("OverridableMethodCallInConstructor") public DefaultStatefulStreamCodec() @@ -63,10 +64,11 @@ public class DefaultStatefulStreamCodec<T> extends Kryo implements StatefulStrea classResolver = (ClassResolver)getClassResolver(); this.pairs = classResolver.pairs; classResolver.init(); + dataStatePair = new DataStatePair(); } @Override - public Object fromDataStatePair(DataStatePair dspair) + public T fromDataStatePair(DataStatePair dspair) { if (dspair.state != null) { try { @@ -93,7 +95,7 @@ public class DefaultStatefulStreamCodec<T> extends Kryo implements StatefulStrea // the following code does not need to be in the try-catch block. It can be // taken out of it, once the stability of the code is validated by 4/1/2014. try { - return readClassAndObject(input); + return (T)readClassAndObject(input); } catch (Throwable th) { logger.error("Catastrophic Error: Execution halted due to Kryo exception!", th); synchronized (this) { @@ -110,7 +112,6 @@ public class DefaultStatefulStreamCodec<T> extends Kryo implements StatefulStrea @Override public DataStatePair toDataStatePair(T o) { - DataStatePair pair = new DataStatePair(); data.setPosition(0); writeClassAndObject(data, o); @@ -121,14 +122,13 @@ public class DefaultStatefulStreamCodec<T> extends Kryo implements StatefulStrea } pairs.clear(); - // can we optimize this? - byte[] bytes = state.toBytes(); - pair.state = new Slice(bytes, 0, bytes.length); + dataStatePair.state = new Slice(state.getBuffer(), 0, state.position()); + } else { + dataStatePair.state = null; } - byte[] bytes = data.toBytes(); - pair.data = new Slice(bytes, 0, bytes.length); - return pair; + dataStatePair.data = new Slice(data.getBuffer(), 0, data.position()); + return dataStatePair; } @Override @@ -149,13 +149,13 @@ public class DefaultStatefulStreamCodec<T> extends Kryo implements StatefulStrea @Override public Object fromByteArray(Slice fragment) { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException(); } @Override public Slice toByteArray(T o) { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException(); } static class ClassIdPair http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e3d47be/engine/src/main/java/com/datatorrent/stram/codec/StatefulStreamCodec.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/codec/StatefulStreamCodec.java b/engine/src/main/java/com/datatorrent/stram/codec/StatefulStreamCodec.java index e531289..3797830 100644 --- a/engine/src/main/java/com/datatorrent/stram/codec/StatefulStreamCodec.java +++ b/engine/src/main/java/com/datatorrent/stram/codec/StatefulStreamCodec.java @@ -63,10 +63,10 @@ public interface StatefulStreamCodec<T> extends StreamCodec<T> /** * Create POJO from the byte array for consumption by the downstream. * - * @param dspair - * @return plain old java object, the type is intentionally not T since the consumer does not care about it. + * @param dspair serialized representation of the object + * @return plain old java object */ - Object fromDataStatePair(DataStatePair dspair); + T fromDataStatePair(DataStatePair dspair); /** * Serialize the POJO emitted by the upstream node to byte array so that http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e3d47be/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java index 42ac5dc..d1a18ae 100644 --- a/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java +++ b/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java @@ -33,7 +33,10 @@ import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.serializers.JavaSerializer; -import com.datatorrent.netlet.util.Slice; +import com.datatorrent.bufferserver.packet.DataTuple; +import com.datatorrent.bufferserver.packet.MessageType; +import com.datatorrent.bufferserver.packet.PayloadTuple; +import com.datatorrent.bufferserver.packet.Tuple; import com.datatorrent.stram.codec.DefaultStatefulStreamCodec.ClassIdPair; import com.datatorrent.stram.codec.StatefulStreamCodec.DataStatePair; @@ -121,36 +124,41 @@ public class DefaultStatefulStreamCodecTest @Test public void testCustomObject() { - DefaultStatefulStreamCodec<Object> coder = new DefaultStatefulStreamCodec<Object>(); - DefaultStatefulStreamCodec<Object> decoder = new DefaultStatefulStreamCodec<Object>(); + DefaultStatefulStreamCodec<TestClass> coder = new DefaultStatefulStreamCodec<>(); + DefaultStatefulStreamCodec<TestClass> decoder = coder.newInstance(); TestClass tc = new TestClass("hello!", 42); //String tc = "hello"; - DataStatePair dsp1 = coder.toDataStatePair(tc); - Slice state1 = dsp1.state; - DataStatePair dsp2 = coder.toDataStatePair(tc); - Slice state2 = dsp2.state; - assert (state1 != null); - assert (state2 == null); - Assert.assertEquals(dsp1.data, dsp2.data); + DataStatePair dsp = coder.toDataStatePair(tc); + Assert.assertNotNull(dsp.state); + byte[] state1 = DataTuple.getSerializedTuple(MessageType.CODEC_STATE_VALUE, dsp.state); + byte[] data1 = PayloadTuple.getSerializedTuple(0, dsp.data); - Object tcObject1 = decoder.fromDataStatePair(dsp1); - assert (tc.equals(tcObject1)); + dsp = coder.toDataStatePair(tc); + Assert.assertNull(dsp.state); + byte[] data2 = PayloadTuple.getSerializedTuple(0, dsp.data); - Object tcObject2 = decoder.fromDataStatePair(dsp2); - assert (tc.equals(tcObject2)); + Assert.assertNotSame(data1, data2); + Assert.assertArrayEquals(data1, data2); + + dsp.state = Tuple.getTuple(state1, 0, state1.length).getData(); + dsp.data = Tuple.getTuple(data1, 0, data1.length).getData(); + Assert.assertEquals(tc, decoder.fromDataStatePair(dsp)); + + dsp.state = null; + dsp.data = Tuple.getTuple(data2, 0, data2.length).getData(); + Assert.assertEquals(tc, decoder.fromDataStatePair(dsp)); coder.resetState(); - dsp2 = coder.toDataStatePair(tc); - state2 = dsp2.state; - Assert.assertEquals(state1, state2); + dsp = coder.toDataStatePair(tc); + Assert.assertArrayEquals(state1, DataTuple.getSerializedTuple(MessageType.CODEC_STATE_VALUE, dsp.state)); - dsp1 = coder.toDataStatePair(tc); - dsp2 = coder.toDataStatePair(tc); - Assert.assertEquals(dsp1.data, dsp2.data); - Assert.assertEquals(dsp1.state, dsp2.state); + Assert.assertNull(coder.toDataStatePair(tc).state); + data1 = PayloadTuple.getSerializedTuple(Integer.MAX_VALUE, coder.toDataStatePair(tc).data); + data2 = PayloadTuple.getSerializedTuple(Integer.MAX_VALUE, coder.toDataStatePair(tc).data); + Assert.assertArrayEquals(data1, data2); } public static class TestTuple
