[FLINK-2723] [core] CopyableValue method to copy into new instance This closes #1169
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e727355e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e727355e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e727355e Branch: refs/heads/master Commit: e727355e42bd0ad7d403aee703aaf33a68a839d2 Parents: 40cbf7e Author: Greg Hogan <[email protected]> Authored: Mon Sep 21 15:14:09 2015 -0400 Committer: Stephan Ewen <[email protected]> Committed: Tue Sep 29 12:18:07 2015 +0200 ---------------------------------------------------------------------- .../org/apache/flink/types/BooleanValue.java | 6 +- .../java/org/apache/flink/types/ByteValue.java | 7 +- .../java/org/apache/flink/types/CharValue.java | 7 +- .../org/apache/flink/types/CopyableValue.java | 30 ++++- .../org/apache/flink/types/DoubleValue.java | 7 +- .../java/org/apache/flink/types/FloatValue.java | 7 +- .../java/org/apache/flink/types/IntValue.java | 9 +- .../java/org/apache/flink/types/LongValue.java | 7 +- .../java/org/apache/flink/types/NullValue.java | 7 +- .../java/org/apache/flink/types/Record.java | 5 + .../java/org/apache/flink/types/ShortValue.java | 7 +- .../org/apache/flink/types/StringValue.java | 7 +- .../apache/flink/types/CopyableValueTest.java | 109 +++++++++++++++++++ 13 files changed, 203 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java b/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java index 071e650..e034648 100644 --- a/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java +++ b/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java @@ -48,7 +48,6 @@ public class BooleanValue implements NormalizableKey<BooleanValue>, ResettableVa this.value = value; } - public boolean get() { return value; } @@ -118,6 +117,11 @@ public class BooleanValue implements NormalizableKey<BooleanValue>, ResettableVa } @Override + public BooleanValue copy() { + return new BooleanValue(this.value); + } + + @Override public void copy(DataInputView source, DataOutputView target) throws IOException { target.write(source, 1); } http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/ByteValue.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/ByteValue.java b/flink-core/src/main/java/org/apache/flink/types/ByteValue.java index c2f1f10..40ed1ad 100644 --- a/flink-core/src/main/java/org/apache/flink/types/ByteValue.java +++ b/flink-core/src/main/java/org/apache/flink/types/ByteValue.java @@ -147,13 +147,18 @@ public class ByteValue implements NormalizableKey<ByteValue>, ResettableValue<By public int getBinaryLength() { return 1; } - + @Override public void copyTo(ByteValue target) { target.value = this.value; } @Override + public ByteValue copy() { + return new ByteValue(this.value); + } + + @Override public void copy(DataInputView source, DataOutputView target) throws IOException { target.write(source, 1); } http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/CharValue.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/CharValue.java b/flink-core/src/main/java/org/apache/flink/types/CharValue.java index 3fd9f29..06b67c7 100644 --- a/flink-core/src/main/java/org/apache/flink/types/CharValue.java +++ b/flink-core/src/main/java/org/apache/flink/types/CharValue.java @@ -151,13 +151,18 @@ public class CharValue implements NormalizableKey<CharValue>, ResettableValue<Ch public int getBinaryLength() { return 2; } - + @Override public void copyTo(CharValue target) { target.value = this.value; } @Override + public CharValue copy() { + return new CharValue(this.value); + } + + @Override public void copy(DataInputView source, DataOutputView target) throws IOException { target.write(source, 2); } http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/CopyableValue.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/CopyableValue.java b/flink-core/src/main/java/org/apache/flink/types/CopyableValue.java index 57e0c46..3974cb2 100644 --- a/flink-core/src/main/java/org/apache/flink/types/CopyableValue.java +++ b/flink-core/src/main/java/org/apache/flink/types/CopyableValue.java @@ -35,7 +35,35 @@ public interface CopyableValue<T> extends Value { */ int getBinaryLength(); + /** + * Performs a deep copy of this object into the {@code target} instance. + * + * @param target Object to copy into. + */ void copyTo(T target); - + + /** + * Performs a deep copy of this object into a new instance. + * + * This method is useful for generic user-defined functions to clone a + * {@link CopyableValue} when storing multiple objects. With object reuse + * a deep copy must be created and type erasure prevents calling new. + * + * @return New object with copied fields. + */ + T copy(); + + /** + * Copies the next serialized instance from {@code source} to {@code target}. + * + * This method is equivalent to calling {@code IOReadableWritable.read(DataInputView)} + * followed by {@code IOReadableWritable.write(DataOutputView)} but does not require + * intermediate deserialization. + * + * @param source Data source for serialized instance. + * @param target Data target for serialized instance. + * + * @see org.apache.flink.core.io.IOReadableWritable + */ void copy(DataInputView source, DataOutputView target) throws IOException; } http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java b/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java index abcbcf5..3158e40 100644 --- a/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java +++ b/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java @@ -122,13 +122,18 @@ public class DoubleValue implements Key<DoubleValue>, ResettableValue<DoubleValu public int getBinaryLength() { return 8; } - + @Override public void copyTo(DoubleValue target) { target.value = this.value; } @Override + public DoubleValue copy() { + return new DoubleValue(this.value); + } + + @Override public void copy(DataInputView source, DataOutputView target) throws IOException { target.write(source, 8); } http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/FloatValue.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/FloatValue.java b/flink-core/src/main/java/org/apache/flink/types/FloatValue.java index d57c74b..5364203 100644 --- a/flink-core/src/main/java/org/apache/flink/types/FloatValue.java +++ b/flink-core/src/main/java/org/apache/flink/types/FloatValue.java @@ -121,13 +121,18 @@ public class FloatValue implements Key<FloatValue>, ResettableValue<FloatValue>, public int getBinaryLength() { return 4; } - + @Override public void copyTo(FloatValue target) { target.value = this.value; } @Override + public FloatValue copy() { + return new FloatValue(this.value); + } + + @Override public void copy(DataInputView source, DataOutputView target) throws IOException { target.write(source, 4); } http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/IntValue.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/IntValue.java b/flink-core/src/main/java/org/apache/flink/types/IntValue.java index 423c8c1..1b893f0 100644 --- a/flink-core/src/main/java/org/apache/flink/types/IntValue.java +++ b/flink-core/src/main/java/org/apache/flink/types/IntValue.java @@ -139,7 +139,7 @@ public class IntValue implements NormalizableKey<IntValue>, ResettableValue<IntV } } else { - target.putIntBigEndian(offset, value - Integer.MIN_VALUE); + target.putIntBigEndian(offset, value - Integer.MIN_VALUE); for (int i = 4; i < len; i++) { target.put(offset + i, (byte) 0); } @@ -152,13 +152,18 @@ public class IntValue implements NormalizableKey<IntValue>, ResettableValue<IntV public int getBinaryLength() { return 4; } - + @Override public void copyTo(IntValue target) { target.value = this.value; } @Override + public IntValue copy() { + return new IntValue(this.value); + } + + @Override public void copy(DataInputView source, DataOutputView target) throws IOException { target.write(source, 4); } http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/LongValue.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/LongValue.java b/flink-core/src/main/java/org/apache/flink/types/LongValue.java index e8fcd53..2b6cb1f 100644 --- a/flink-core/src/main/java/org/apache/flink/types/LongValue.java +++ b/flink-core/src/main/java/org/apache/flink/types/LongValue.java @@ -160,13 +160,18 @@ public class LongValue implements NormalizableKey<LongValue>, ResettableValue<Lo public int getBinaryLength() { return 8; } - + @Override public void copyTo(LongValue target) { target.value = this.value; } @Override + public LongValue copy() { + return new LongValue(this.value); + } + + @Override public void copy(DataInputView source, DataOutputView target) throws IOException { target.write(source, 8); } http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/NullValue.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/NullValue.java b/flink-core/src/main/java/org/apache/flink/types/NullValue.java index 5391b7b..aa56536 100644 --- a/flink-core/src/main/java/org/apache/flink/types/NullValue.java +++ b/flink-core/src/main/java/org/apache/flink/types/NullValue.java @@ -110,12 +110,17 @@ public final class NullValue implements NormalizableKey<NullValue>, CopyableValu public int getBinaryLength() { return 1; } - + @Override public void copyTo(NullValue target) { } @Override + public NullValue copy() { + return NullValue.getInstance(); + } + + @Override public void copy(DataInputView source, DataOutputView target) throws IOException { source.readBoolean(); target.writeBoolean(false); http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/Record.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/Record.java b/flink-core/src/main/java/org/apache/flink/types/Record.java index 218d6ce..24ff979 100644 --- a/flink-core/src/main/java/org/apache/flink/types/Record.java +++ b/flink-core/src/main/java/org/apache/flink/types/Record.java @@ -763,6 +763,11 @@ public final class Record implements Value, CopyableValue<Record> { } @Override + public Record copy() { + return createCopy(); + } + + @Override public void copy(DataInputView source, DataOutputView target) throws IOException { int val = source.readUnsignedByte(); target.writeByte(val); http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/ShortValue.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/ShortValue.java b/flink-core/src/main/java/org/apache/flink/types/ShortValue.java index ec03497..f18ce7f 100644 --- a/flink-core/src/main/java/org/apache/flink/types/ShortValue.java +++ b/flink-core/src/main/java/org/apache/flink/types/ShortValue.java @@ -154,13 +154,18 @@ public class ShortValue implements NormalizableKey<ShortValue>, ResettableValue< public int getBinaryLength() { return 2; } - + @Override public void copyTo(ShortValue target) { target.value = this.value; } @Override + public ShortValue copy() { + return new ShortValue(this.value); + } + + @Override public void copy(DataInputView source, DataOutputView target) throws IOException { target.write(source, 2); } http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/StringValue.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/StringValue.java b/flink-core/src/main/java/org/apache/flink/types/StringValue.java index db0f184..2249019 100644 --- a/flink-core/src/main/java/org/apache/flink/types/StringValue.java +++ b/flink-core/src/main/java/org/apache/flink/types/StringValue.java @@ -682,7 +682,12 @@ public class StringValue implements NormalizableKey<StringValue>, CharSequence, target.ensureSize(this.len); System.arraycopy(this.value, 0, target.value, 0, this.len); } - + + @Override + public StringValue copy() { + return new StringValue(this); + } + @Override public void copy(DataInputView in, DataOutputView target) throws IOException { int len = in.readUnsignedByte(); http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/test/java/org/apache/flink/types/CopyableValueTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/CopyableValueTest.java b/flink-core/src/test/java/org/apache/flink/types/CopyableValueTest.java new file mode 100644 index 0000000..76bdece --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/types/CopyableValueTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.types; + +import static org.junit.Assert.*; + +import org.junit.Test; + +public class CopyableValueTest { + + @Test + public void testCopy() { + CopyableValue<?>[] value_types = new CopyableValue[] { + new BooleanValue(true), + new ByteValue((byte) 42), + new CharValue('q'), + new DoubleValue(3.1415926535897932), + new FloatValue((float) 3.14159265), + new IntValue(42), + new LongValue(42l), + new NullValue(), + new ShortValue((short) 42), + new StringValue("QED") + }; + + for (CopyableValue<?> type : value_types) { + assertEquals(type, type.copy()); + } + } + + @Test + public void testCopyTo() { + BooleanValue boolean_from = new BooleanValue(true); + BooleanValue boolean_to = new BooleanValue(false); + + boolean_from.copyTo(boolean_to); + assertEquals(boolean_from, boolean_to); + + ByteValue byte_from = new ByteValue((byte) 3); + ByteValue byte_to = new ByteValue((byte) 7); + + byte_from.copyTo(byte_to); + assertEquals(byte_from, byte_to); + + CharValue char_from = new CharValue('α'); + CharValue char_to = new CharValue('Ï'); + + char_from.copyTo(char_to); + assertEquals(char_from, char_to); + + DoubleValue double_from = new DoubleValue(2.7182818284590451); + DoubleValue double_to = new DoubleValue(0); + + double_from.copyTo(double_to); + assertEquals(double_from, double_to); + + FloatValue float_from = new FloatValue((float) 2.71828182); + FloatValue float_to = new FloatValue((float) 1.41421356); + + float_from.copyTo(float_to); + assertEquals(float_from, float_to); + + IntValue int_from = new IntValue(8191); + IntValue int_to = new IntValue(131071); + + int_from.copyTo(int_to); + assertEquals(int_from, int_to); + + LongValue long_from = new LongValue(524287); + LongValue long_to = new LongValue(2147483647); + + long_from.copyTo(long_to); + assertEquals(long_from, long_to); + + NullValue null_from = new NullValue(); + NullValue null_to = new NullValue(); + + null_from.copyTo(null_to); + assertEquals(null_from, null_to); + + ShortValue short_from = new ShortValue((short) 31); + ShortValue short_to = new ShortValue((short) 127); + + short_from.copyTo(short_to); + assertEquals(short_from, short_to); + + StringValue string_from = new StringValue("2305843009213693951"); + StringValue string_to = new StringValue("618970019642690137449562111"); + + string_from.copyTo(string_to); + assertEquals(string_from, string_to); + } +}
