Repository: apex-core Updated Branches: refs/heads/master 8ae80fee1 -> eedd467e0
APEXCORE-502 Unnecessary byte array copy in DefaultKryoStreamCodec.toByteArray Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/c210dc21 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/c210dc21 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/c210dc21 Branch: refs/heads/master Commit: c210dc212a8ac2ecd3d83ce21e72960f35c2e9db Parents: 3119aba Author: Vlad Rozov <[email protected]> Authored: Tue Aug 16 18:46:14 2016 -0700 Committer: Vlad Rozov <[email protected]> Committed: Tue Aug 16 18:46:14 2016 -0700 ---------------------------------------------------------------------- .../plan/logical/DefaultKryoStreamCodec.java | 37 +++++++------------- 1 file changed, 12 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/c210dc21/engine/src/main/java/com/datatorrent/stram/plan/logical/DefaultKryoStreamCodec.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/DefaultKryoStreamCodec.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/DefaultKryoStreamCodec.java index 578a9d7..8d0532b 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/DefaultKryoStreamCodec.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/DefaultKryoStreamCodec.java @@ -18,17 +18,12 @@ */ package com.datatorrent.stram.plan.logical; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; -import com.google.common.base.Throwables; import com.datatorrent.api.StreamCodec; import com.datatorrent.common.util.SerializableObject; @@ -56,37 +51,29 @@ public class DefaultKryoStreamCodec<T> extends SerializableObject implements Str @Override public Object fromByteArray(Slice fragment) { + final Input input = new Input(fragment.buffer, fragment.offset, fragment.length); try { - ByteArrayInputStream is = new ByteArrayInputStream(fragment.buffer, fragment.offset, fragment.length); - Input input = new Input(is); - Object returnObject = kryo.readClassAndObject(input); - is.close(); - return returnObject; - } catch (IOException e) { - throw Throwables.propagate(e); + return kryo.readClassAndObject(input); + } finally { + input.close(); } } @Override - public Slice toByteArray(T info) + public Slice toByteArray(T o) { - Slice slice = null; + final Output output = new Output(32, -1); try { - ByteArrayOutputStream os = new ByteArrayOutputStream(); - Output output = new Output(os); - kryo.writeClassAndObject(output, info); - output.flush(); - slice = new Slice(os.toByteArray(), 0, os.toByteArray().length); - os.close(); - } catch (IOException e) { - throw Throwables.propagate(e); + kryo.writeClassAndObject(output, o); + } finally { + output.close(); } - return slice; + return new Slice(output.getBuffer(), 0, output.position()); } @Override - public int getPartition(T t) + public int getPartition(T o) { - return t.hashCode(); + return o.hashCode(); } }
