[Gearpump 311] refactor state management
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/7068699d Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/7068699d Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/7068699d Branch: refs/heads/state Commit: 7068699d3e865f4d5bd49bcfb59f2cd4115ec125 Parents: b9f1086 Author: vinoyang <[email protected]> Authored: Wed Jun 28 11:41:15 2017 +0800 Committer: vinoyang <[email protected]> Committed: Sun Jul 23 09:59:05 2017 +0800 ---------------------------------------------------------------------- .../streaming/refactor/coder/AtomicCoder.java | 48 ++ .../refactor/coder/BigEndianIntegerCoder.java | 83 ++++ .../refactor/coder/BigEndianLongCoder.java | 83 ++++ .../refactor/coder/BigIntegerCoder.java | 74 +++ .../BufferedElementCountingOutputStream.java | 125 +++++ .../refactor/coder/ByteArrayCoder.java | 94 ++++ .../streaming/refactor/coder/ByteCoder.java | 88 ++++ .../streaming/refactor/coder/Coder.java | 134 +++++ .../refactor/coder/CoderException.java | 32 ++ .../streaming/refactor/coder/CoderUtils.java | 286 +++++++++++ .../streaming/refactor/coder/DoubleCoder.java | 86 ++++ .../ElementByteSizeObservableIterable.java | 45 ++ .../ElementByteSizeObservableIterator.java | 30 ++ .../refactor/coder/ElementByteSizeObserver.java | 66 +++ .../streaming/refactor/coder/IterableCoder.java | 41 ++ .../refactor/coder/IterableLikeCoder.java | 238 +++++++++ .../refactor/coder/IteratorObserver.java | 56 +++ .../streaming/refactor/coder/ListCoder.java | 47 ++ .../streaming/refactor/coder/MapCoder.java | 138 ++++++ .../streaming/refactor/coder/SetCoder.java | 49 ++ .../refactor/coder/StringUtf8Coder.java | 99 ++++ .../refactor/coder/StructuralByteArray.java | 55 +++ .../refactor/coder/StructuredCoder.java | 95 ++++ .../streaming/refactor/coder/VarInt.java | 91 ++++ .../streaming/refactor/coder/VarIntCoder.java | 82 ++++ .../streaming/refactor/coder/VarLongCoder.java | 88 ++++ .../streaming/refactor/coder/VoidCoder.java | 66 +++ .../dsl/window/impl/ReduceFnRunner.scala | 6 + .../refactor/sink/DataSinkProcessor.scala | 17 + .../state/InMemoryGlobalStateInternals.scala | 269 +++++++++++ .../refactor/state/RuntimeContext.scala | 6 + .../streaming/refactor/state/StateBinder.scala | 35 ++ .../refactor/state/StateNamespace.scala | 29 ++ .../refactor/state/StateNamespaces.scala | 61 +++ .../streaming/refactor/state/StateSpecs.scala | 213 ++++++++ .../streaming/refactor/state/StateTags.scala | 109 +++++ .../streaming/refactor/state/StatefulTask.scala | 11 + .../streaming/refactor/state/api/BagState.scala | 27 ++ .../refactor/state/api/CombiningState.scala | 31 ++ .../refactor/state/api/GroupingState.scala | 29 ++ .../streaming/refactor/state/api/MapState.scala | 39 ++ .../refactor/state/api/ReadableState.scala | 27 ++ .../streaming/refactor/state/api/SetState.scala | 33 ++ .../refactor/state/api/StateInternals.scala | 29 ++ .../state/api/StateInternalsFactory.scala | 25 + .../refactor/state/api/ValueState.scala | 27 ++ .../state/heap/HeapStateInternals.scala | 305 ++++++++++++ .../state/heap/HeapStateInternalsFactory.scala | 62 +++ .../state/heap/HeapStateInternalsProxy.scala | 52 ++ .../state/heap/HeapStateInternalsSpec.scala | 484 +++++++++++++++++++ 50 files changed, 4345 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/AtomicCoder.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/AtomicCoder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/AtomicCoder.java new file mode 100644 index 0000000..e152b48 --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/AtomicCoder.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.coder; + +import java.util.Collections; +import java.util.List; + +public abstract class AtomicCoder<T> extends StructuredCoder<T> { + + @Override + public void verifyDeterministic() {} + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return Collections.emptyList(); + } + + @Override + public final List<? extends Coder<?>> getComponents() { + return Collections.emptyList(); + } + + @Override + public final boolean equals(Object other) { + return other != null && this.getClass().equals(other.getClass()); + } + + @Override + public final int hashCode() { + return this.getClass().hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BigEndianIntegerCoder.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BigEndianIntegerCoder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BigEndianIntegerCoder.java new file mode 100644 index 0000000..27ec539 --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BigEndianIntegerCoder.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.coder; + +import java.io.*; + +public class BigEndianIntegerCoder extends AtomicCoder<Integer> { + + public static BigEndianIntegerCoder of() { + return INSTANCE; + } + + ///////////////////////////////////////////////////////////////////////////// + + private static final BigEndianIntegerCoder INSTANCE = new BigEndianIntegerCoder(); + + private BigEndianIntegerCoder() {} + + @Override + public void encode(Integer value, OutputStream outStream) + throws CoderException { + if (value == null) { + throw new CoderException("cannot encode a null Integer"); + } + + try { + new DataOutputStream(outStream).writeInt(value); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Integer decode(InputStream inStream) + throws CoderException { + try { + return new DataInputStream(inStream).readInt(); + } catch (EOFException | UTFDataFormatException exn) { + // These exceptions correspond to decoding problems, so change + // what kind of exception they're branded as. + throw new CoderException(exn); + } catch (IOException e) { + throw new CoderException(e); + } + } + + @Override + public void verifyDeterministic() {} + + @Override + public boolean consistentWithEquals() { + return true; + } + + @Override + public boolean isRegisterByteSizeObserverCheap(Integer value) { + return true; + } + + @Override + protected long getEncodedElementByteSize(Integer value) { + if (value == null) { + throw new CoderException("cannot encode a null Integer"); + } + return 4; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BigEndianLongCoder.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BigEndianLongCoder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BigEndianLongCoder.java new file mode 100644 index 0000000..c788729 --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BigEndianLongCoder.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.coder; + +import java.io.*; + +public class BigEndianLongCoder extends AtomicCoder<Long> { + + public static BigEndianLongCoder of() { + return INSTANCE; + } + + ///////////////////////////////////////////////////////////////////////////// + + private static final BigEndianLongCoder INSTANCE = new BigEndianLongCoder(); + + private BigEndianLongCoder() {} + + @Override + public void encode(Long value, OutputStream outStream) + throws CoderException { + if (value == null) { + throw new CoderException("cannot encode a null Long"); + } + try { + new DataOutputStream(outStream).writeLong(value); + } catch (IOException e) { + throw new CoderException(e); + } + } + + @Override + public Long decode(InputStream inStream) + throws CoderException { + try { + return new DataInputStream(inStream).readLong(); + } catch (EOFException | UTFDataFormatException exn) { + // These exceptions correspond to decoding problems, so change + // what kind of exception they're branded as. + throw new CoderException(exn); + } catch (IOException e) { + throw new CoderException(e); + } + } + + @Override + public void verifyDeterministic() { + } + + @Override + public boolean consistentWithEquals() { + return true; + } + + @Override + public boolean isRegisterByteSizeObserverCheap(Long value) { + return true; + } + + @Override + protected long getEncodedElementByteSize(Long value) { + if (value == null) { + throw new CoderException("cannot encode a null Long"); + } + return 8; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BigIntegerCoder.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BigIntegerCoder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BigIntegerCoder.java new file mode 100644 index 0000000..4a65992 --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BigIntegerCoder.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.coder; + +import java.io.InputStream; +import java.io.OutputStream; +import java.math.BigInteger; + +import static com.google.common.base.Preconditions.checkNotNull; + +public class BigIntegerCoder extends AtomicCoder<BigInteger> { + + public static BigIntegerCoder of() { + return INSTANCE; + } + + ///////////////////////////////////////////////////////////////////////////// + + private static final BigIntegerCoder INSTANCE = new BigIntegerCoder(); + private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of(); + + private BigIntegerCoder() { + } + + @Override + public void encode(BigInteger value, OutputStream outStream) + throws CoderException { + checkNotNull(value, String.format("cannot encode a null %s", BigInteger.class.getSimpleName())); + BYTE_ARRAY_CODER.encode(value.toByteArray(), outStream); + } + + @Override + public BigInteger decode(InputStream inStream) + throws CoderException { + return new BigInteger(BYTE_ARRAY_CODER.decode(inStream)); + } + + @Override + public void verifyDeterministic() { + BYTE_ARRAY_CODER.verifyDeterministic(); + } + + @Override + public boolean consistentWithEquals() { + return true; + } + + @Override + public boolean isRegisterByteSizeObserverCheap(BigInteger value) { + return true; + } + + @Override + protected long getEncodedElementByteSize(BigInteger value) { + checkNotNull(value, String.format("cannot encode a null %s", BigInteger.class.getSimpleName())); + return BYTE_ARRAY_CODER.getEncodedElementByteSize(value.toByteArray()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BufferedElementCountingOutputStream.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BufferedElementCountingOutputStream.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BufferedElementCountingOutputStream.java new file mode 100644 index 0000000..119e6eb --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BufferedElementCountingOutputStream.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.coder; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +public class BufferedElementCountingOutputStream extends OutputStream { + public static final int DEFAULT_BUFFER_SIZE = 64 * 1024; + private final ByteBuffer buffer; + private final OutputStream os; + private boolean finished; + private long count; + + public BufferedElementCountingOutputStream(OutputStream os) { + this(os, DEFAULT_BUFFER_SIZE); + } + + BufferedElementCountingOutputStream(OutputStream os, int bufferSize) { + this.buffer = ByteBuffer.allocate(bufferSize); + this.os = os; + this.finished = false; + this.count = 0; + } + + public void finish() throws IOException { + if (finished) { + return; + } + flush(); + // Finish the stream by stating that there are 0 elements that follow. + VarInt.encode(0, os); + finished = true; + } + + public void markElementStart() throws IOException { + if (finished) { + throw new IOException("Stream has been finished. Can not add any more elements."); + } + count++; + } + + @Override + public void write(int b) throws IOException { + if (finished) { + throw new IOException("Stream has been finished. Can not write any more data."); + } + if (count == 0) { + os.write(b); + return; + } + + if (buffer.hasRemaining()) { + buffer.put((byte) b); + } else { + outputBuffer(); + os.write(b); + } + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + if (finished) { + throw new IOException("Stream has been finished. Can not write any more data."); + } + if (count == 0) { + os.write(b, off, len); + return; + } + + if (buffer.remaining() >= len) { + buffer.put(b, off, len); + } else { + outputBuffer(); + os.write(b, off, len); + } + } + + @Override + public void flush() throws IOException { + if (finished) { + return; + } + outputBuffer(); + os.flush(); + } + + @Override + public void close() throws IOException { + finish(); + os.close(); + } + + // Output the buffer if it contains any data. + private void outputBuffer() throws IOException { + if (count > 0) { + VarInt.encode(count, os); + // We are using a heap based buffer and not a direct buffer so it is safe to access + // the underlying array. + os.write(buffer.array(), buffer.arrayOffset(), buffer.position()); + buffer.clear(); + // The buffer has been flushed so we must write to the underlying stream until + // we learn of the next element. We reset the count to zero marking that we should + // not use the buffer. + count = 0; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ByteArrayCoder.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ByteArrayCoder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ByteArrayCoder.java new file mode 100644 index 0000000..6b1af05 --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ByteArrayCoder.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.coder; + +import com.google.common.io.ByteStreams; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +public class ByteArrayCoder extends AtomicCoder<byte[]> { + + public static ByteArrayCoder of() { + return INSTANCE; + } + + ///////////////////////////////////////////////////////////////////////////// + + private static final ByteArrayCoder INSTANCE = new ByteArrayCoder(); + + private ByteArrayCoder() { + } + + @Override + public void encode(byte[] value, OutputStream outStream) + throws CoderException { + if (value == null) { + throw new CoderException("cannot encode a null byte[]"); + } + + try { + VarInt.encode(value.length, outStream); + outStream.write(value); + } catch (IOException e) { + throw new CoderException(e); + } + } + + @Override + public byte[] decode(InputStream inStream) + throws CoderException { + byte[] value = null; + try { + int length = VarInt.decodeInt(inStream); + if (length < 0) { + throw new CoderException("invalid length " + length); + } + value = new byte[length]; + + ByteStreams.readFully(inStream, value); + } catch (IOException e) { + throw new CoderException(e); + } + return value; + } + + @Override + public void verifyDeterministic() { + } + + @Override + public Object structuralValue(byte[] value) { + return new StructuralByteArray(value); + } + + @Override + public boolean isRegisterByteSizeObserverCheap(byte[] value) { + return true; + } + + @Override + protected long getEncodedElementByteSize(byte[] value) { + if (value == null) { + throw new CoderException("cannot encode a null byte[]"); + } + return VarInt.getLength(value.length) + value.length; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ByteCoder.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ByteCoder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ByteCoder.java new file mode 100644 index 0000000..e3cb7e4 --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ByteCoder.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.coder; + +import java.io.*; + +public class ByteCoder extends AtomicCoder<Byte> { + + public static ByteCoder of() { + return INSTANCE; + } + + ///////////////////////////////////////////////////////////////////////////// + + private static final ByteCoder INSTANCE = new ByteCoder(); + + private ByteCoder() { + } + + @Override + public void encode(Byte value, OutputStream outStream) + throws CoderException { + if (value == null) { + throw new CoderException("cannot encode a null Byte"); + } + try { + outStream.write(value.byteValue()); + } catch (IOException e) { + throw new CoderException(e); + } + } + + @Override + public Byte decode(InputStream inStream) throws CoderException { + try { + // value will be between 0-255, -1 for EOF + int value = inStream.read(); + if (value == -1) { + throw new EOFException("EOF encountered decoding 1 byte from input stream"); + } + return (byte) value; + } catch (EOFException | UTFDataFormatException exn) { + // These exceptions correspond to decoding problems, so change + // what kind of exception they're branded as. + throw new CoderException(exn); + } catch (IOException e) { + throw new CoderException(e); + } + } + + @Override + public void verifyDeterministic() { + } + + @Override + public boolean consistentWithEquals() { + return true; + } + + @Override + public boolean isRegisterByteSizeObserverCheap(Byte value) { + return true; + } + + @Override + protected long getEncodedElementByteSize(Byte value) { + if (value == null) { + throw new CoderException("cannot estimate size for unsupported null value"); + } + return 1; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/Coder.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/Coder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/Coder.java new file mode 100644 index 0000000..e1999ed --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/Coder.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.coder; + +import com.google.common.base.Joiner; +import com.google.common.base.Objects; +import com.google.common.io.ByteStreams; +import com.google.common.io.CountingOutputStream; + +import java.io.*; +import java.util.Arrays; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; + +public abstract class Coder<T> implements Serializable { + + public abstract void encode(T value, OutputStream outStream) + throws CoderException; + + public abstract T decode(InputStream inStream) throws CoderException; + + public abstract List<? extends Coder<?>> getCoderArguments(); + + public abstract void verifyDeterministic() throws Coder.NonDeterministicException; + + public static void verifyDeterministic(Coder<?> target, String message, Iterable<Coder<?>> coders) + throws NonDeterministicException { + for (Coder<?> coder : coders) { + try { + coder.verifyDeterministic(); + } catch (NonDeterministicException e) { + throw new NonDeterministicException(target, message, e); + } + } + } + + public static void verifyDeterministic(Coder<?> target, String message, Coder<?>... coders) + throws NonDeterministicException { + verifyDeterministic(target, message, Arrays.asList(coders)); + } + + public boolean consistentWithEquals() { + return false; + } + + public Object structuralValue(T value) { + if (value != null && consistentWithEquals()) { + return value; + } else { + try { + ByteArrayOutputStream os = new ByteArrayOutputStream(); + encode(value, os); + return new StructuralByteArray(os.toByteArray()); + } catch (Exception exn) { + throw new IllegalArgumentException( + "Unable to encode element '" + value + "' with coder '" + this + "'.", exn); + } + } + } + + public boolean isRegisterByteSizeObserverCheap(T value) { + return false; + } + + public void registerByteSizeObserver(T value, ElementByteSizeObserver observer) { + observer.update(getEncodedElementByteSize(value)); + } + + protected long getEncodedElementByteSize(T value) { + try (CountingOutputStream os = new CountingOutputStream(ByteStreams.nullOutputStream())) { + encode(value, os); + return os.getCount(); + } catch (Exception exn) { + throw new IllegalArgumentException( + "Unable to encode element '" + value + "' with coder '" + this + "'.", exn); + } + } + + public static class NonDeterministicException extends RuntimeException { + private Coder<?> coder; + private List<String> reasons; + + public NonDeterministicException( + Coder<?> coder, String reason, NonDeterministicException e) { + this(coder, Arrays.asList(reason), e); + } + + public NonDeterministicException(Coder<?> coder, String reason) { + this(coder, Arrays.asList(reason), null); + } + + public NonDeterministicException(Coder<?> coder, List<String> reasons) { + this(coder, reasons, null); + } + + public NonDeterministicException( + Coder<?> coder, + List<String> reasons, + NonDeterministicException cause) { + super(cause); + checkArgument(reasons.size() > 0, "Reasons must not be empty."); + this.reasons = reasons; + this.coder = coder; + } + + public Iterable<String> getReasons() { + return reasons; + } + + @Override + public String getMessage() { + return String.format("%s is not deterministic because:%n %s", + coder, Joiner.on("%n ").join(reasons)); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/CoderException.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/CoderException.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/CoderException.java new file mode 100644 index 0000000..8213e42 --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/CoderException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.refactor.coder; + +public class CoderException extends RuntimeException { + public CoderException(String message) { + super(message); + } + + public CoderException(String message, Throwable cause) { + super(message, cause); + } + + public CoderException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/CoderUtils.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/CoderUtils.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/CoderUtils.java new file mode 100644 index 0000000..2126c48 --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/CoderUtils.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.coder; + +import com.google.common.io.BaseEncoding; + +import java.io.*; +import java.lang.ref.SoftReference; + +public final class CoderUtils { + private CoderUtils() { + } // Non-instantiable + + private static ThreadLocal<SoftReference<ExposedByteArrayOutputStream>> + threadLocalOutputStream = new ThreadLocal<>(); + + private static ThreadLocal<Boolean> threadLocalOutputStreamInUse = new ThreadLocal<Boolean>() { + @Override + protected Boolean initialValue() { + return false; + } + }; + + public static <T> byte[] encodeToByteArray(Coder<T> coder, T value) + throws CoderException { + if (threadLocalOutputStreamInUse.get()) { + // encodeToByteArray() is called recursively and the thread local stream is in use, + // allocating a new one. + ByteArrayOutputStream stream = new ExposedByteArrayOutputStream(); + encodeToSafeStream(coder, value, stream); + return stream.toByteArray(); + } else { + threadLocalOutputStreamInUse.set(true); + try { + ByteArrayOutputStream stream = getThreadLocalOutputStream(); + encodeToSafeStream(coder, value, stream); + return stream.toByteArray(); + } finally { + threadLocalOutputStreamInUse.set(false); + } + } + } + + private static <T> void encodeToSafeStream( + Coder<T> coder, T value, OutputStream stream) throws CoderException { + try { + coder.encode(value, new UnownedOutputStream(stream)); + } catch (CoderException exn) { + throw new IllegalArgumentException( + "Forbidden IOException when writing to OutputStream", exn); + } + } + + public static <T> T decodeFromByteArray( + Coder<T> coder, byte[] encodedValue) throws CoderException { + try (ExposedByteArrayInputStream stream = new ExposedByteArrayInputStream(encodedValue)) { + T result = decodeFromSafeStream(coder, stream); + if (stream.available() != 0) { + throw new CoderException( + stream.available() + " unexpected extra bytes after decoding " + result); + } + return result; + } + } + + private static <T> T decodeFromSafeStream( + Coder<T> coder, InputStream stream) throws CoderException { + try { + return coder.decode(new UnownedInputStream(stream)); + } catch (CoderException exn) { + throw new IllegalArgumentException( + "Forbidden IOException when reading from InputStream", exn); + } + } + + private static ByteArrayOutputStream getThreadLocalOutputStream() { + SoftReference<ExposedByteArrayOutputStream> refStream = threadLocalOutputStream.get(); + ExposedByteArrayOutputStream stream = refStream == null ? null : refStream.get(); + if (stream == null) { + stream = new ExposedByteArrayOutputStream(); + threadLocalOutputStream.set(new SoftReference<>(stream)); + } + stream.reset(); + return stream; + } + + public static <T> T clone(Coder<T> coder, T value) throws CoderException { + return decodeFromByteArray(coder, encodeToByteArray(coder, value)); + } + + public static <T> String encodeToBase64(Coder<T> coder, T value) + throws CoderException { + byte[] rawValue = encodeToByteArray(coder, value); + return BaseEncoding.base64Url().omitPadding().encode(rawValue); + } + + public static <T> T decodeFromBase64(Coder<T> coder, String encodedValue) throws CoderException { + return decodeFromSafeStream( + coder, + new ByteArrayInputStream(BaseEncoding.base64Url().omitPadding().decode(encodedValue))); + } + + public static class ExposedByteArrayOutputStream extends ByteArrayOutputStream { + + private byte[] swappedBuffer; + + private boolean isFallback = false; + + private void fallback() { + isFallback = true; + if (swappedBuffer != null) { + // swappedBuffer != null means buf is actually provided by the caller of writeAndOwn(), + // while swappedBuffer is the original buffer. + // Recover the buffer and copy the bytes from buf. + byte[] tempBuffer = buf; + count = 0; + buf = swappedBuffer; + super.write(tempBuffer, 0, tempBuffer.length); + swappedBuffer = null; + } + } + + public void writeAndOwn(byte[] b) throws IOException { + if (b.length == 0) { + return; + } + if (count == 0) { + // Optimized first-time whole write. + // The original buffer will be swapped to swappedBuffer, while the input b is used as buf. + swappedBuffer = buf; + buf = b; + count = b.length; + } else { + fallback(); + super.write(b); + } + } + + @Override + public void write(byte[] b, int off, int len) { + fallback(); + super.write(b, off, len); + } + + @Override + public void write(int b) { + fallback(); + super.write(b); + } + + @Override + public byte[] toByteArray() { + // Note: count == buf.length is not a correct criteria to "return buf;", because the internal + // buf may be reused after reset(). + if (!isFallback && count > 0) { + return buf; + } else { + return super.toByteArray(); + } + } + + @Override + public void reset() { + if (count == 0) { + return; + } + count = 0; + if (isFallback) { + isFallback = false; + } else { + buf = swappedBuffer; + swappedBuffer = null; + } + } + } + + public static class ExposedByteArrayInputStream extends ByteArrayInputStream { + + public ExposedByteArrayInputStream(byte[] buf) { + super(buf); + } + + public byte[] readAll() throws IOException { + if (pos == 0 && count == buf.length) { + pos = count; + return buf; + } + byte[] ret = new byte[count - pos]; + super.read(ret); + return ret; + } + + @Override + public void close() { + try { + super.close(); + } catch (IOException exn) { + throw new RuntimeException("Unexpected IOException closing ByteArrayInputStream", exn); + } + } + } + + public static class UnownedOutputStream extends FilterOutputStream { + public UnownedOutputStream(OutputStream delegate) { + super(delegate); + } + + @Override + public void close() throws IOException { + throw new UnsupportedOperationException("Caller does not own the underlying output stream " + + " and should not call close()."); + } + + @Override + public boolean equals(Object obj) { + return obj instanceof UnownedOutputStream + && ((UnownedOutputStream) obj).out.equals(out); + } + + @Override + public int hashCode() { + return out.hashCode(); + } + + } + + public static class UnownedInputStream extends FilterInputStream { + public UnownedInputStream(InputStream delegate) { + super(delegate); + } + + @Override + public void close() throws IOException { + throw new UnsupportedOperationException("Caller does not own the underlying input stream " + + " and should not call close()."); + } + + @Override + public boolean equals(Object obj) { + return obj instanceof UnownedInputStream + && ((UnownedInputStream) obj).in.equals(in); + } + + @Override + public int hashCode() { + return in.hashCode(); + } + + @SuppressWarnings("UnsynchronizedOverridesSynchronized") + @Override + public void mark(int readlimit) { + throw new UnsupportedOperationException("Caller does not own the underlying input stream " + + " and should not call mark()."); + } + + @Override + public boolean markSupported() { + return false; + } + + @SuppressWarnings("UnsynchronizedOverridesSynchronized") + @Override + public void reset() throws IOException { + throw new UnsupportedOperationException("Caller does not own the underlying input stream " + + " and should not call reset()."); + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/DoubleCoder.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/DoubleCoder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/DoubleCoder.java new file mode 100644 index 0000000..981bee2 --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/DoubleCoder.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.coder; + +import java.io.*; + +public class DoubleCoder extends AtomicCoder<Double> { + + public static DoubleCoder of() { + return INSTANCE; + } + + ///////////////////////////////////////////////////////////////////////////// + + private static final DoubleCoder INSTANCE = new DoubleCoder(); + + private DoubleCoder() { + } + + @Override + public void encode(Double value, OutputStream outStream) + throws CoderException { + if (value == null) { + throw new CoderException("cannot encode a null Double"); + } + try { + new DataOutputStream(outStream).writeDouble(value); + } catch (IOException e) { + throw new CoderException(e); + } + } + + @Override + public Double decode(InputStream inStream) + throws CoderException { + try { + return new DataInputStream(inStream).readDouble(); + } catch (EOFException | UTFDataFormatException exn) { + // These exceptions correspond to decoding problems, so change + // what kind of exception they're branded as. + throw new CoderException(exn); + } catch (IOException e) { + throw new CoderException(e); + } + } + + @Override + public void verifyDeterministic() { + throw new NonDeterministicException(this, + "Floating point encodings are not guaranteed to be deterministic."); + } + + @Override + public boolean consistentWithEquals() { + return true; + } + + @Override + public boolean isRegisterByteSizeObserverCheap(Double value) { + return true; + } + + @Override + protected long getEncodedElementByteSize(Double value) { + if (value == null) { + throw new CoderException("cannot encode a null Double"); + } + return 8; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ElementByteSizeObservableIterable.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ElementByteSizeObservableIterable.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ElementByteSizeObservableIterable.java new file mode 100644 index 0000000..29b4aa5 --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ElementByteSizeObservableIterable.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.coder; + +import java.util.ArrayList; +import java.util.List; +import java.util.Observer; + +public abstract class ElementByteSizeObservableIterable< + V, InputT extends ElementByteSizeObservableIterator<V>> + implements Iterable<V> { + private List<Observer> observers = new ArrayList<>(); + + protected abstract InputT createIterator(); + + public void addObserver(Observer observer) { + observers.add(observer); + } + + @Override + public InputT iterator() { + InputT iterator = createIterator(); + for (Observer observer : observers) { + iterator.addObserver(observer); + } + observers.clear(); + return iterator; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ElementByteSizeObservableIterator.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ElementByteSizeObservableIterator.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ElementByteSizeObservableIterator.java new file mode 100644 index 0000000..946882b --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ElementByteSizeObservableIterator.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.coder; + +import java.util.Iterator; +import java.util.Observable; + +public abstract class ElementByteSizeObservableIterator<V> + extends Observable implements Iterator<V> { + protected final void notifyValueReturned(long byteSize) { + setChanged(); + notifyObservers(byteSize); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ElementByteSizeObserver.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ElementByteSizeObserver.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ElementByteSizeObserver.java new file mode 100644 index 0000000..5464067 --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ElementByteSizeObserver.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.coder; + +import java.util.Observable; +import java.util.Observer; + +public abstract class ElementByteSizeObserver implements Observer { + private boolean isLazy = false; + private long totalSize = 0; + private double scalingFactor = 1.0; + + public ElementByteSizeObserver() { + } + + protected abstract void reportElementSize(long elementByteSize); + + public void setLazy() { + isLazy = true; + } + + public boolean getIsLazy() { + return isLazy; + } + + public void update(Object obj) { + update(null, obj); + } + + public void setScalingFactor(double scalingFactor) { + this.scalingFactor = scalingFactor; + } + + @Override + public void update(Observable obs, Object obj) { + if (obj instanceof Long) { + totalSize += scalingFactor * (Long) obj; + } else if (obj instanceof Integer) { + totalSize += scalingFactor * (Integer) obj; + } else { + throw new AssertionError("unexpected parameter object"); + } + } + + public void advance() { + reportElementSize(totalSize); + totalSize = 0; + isLazy = false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/IterableCoder.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/IterableCoder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/IterableCoder.java new file mode 100644 index 0000000..d069068 --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/IterableCoder.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.coder; + +import java.util.List; + +public class IterableCoder<T> extends IterableLikeCoder<T, Iterable<T>> { + + public static <T> IterableCoder<T> of(Coder<T> elemCoder) { + return new IterableCoder<>(elemCoder); + } + + ///////////////////////////////////////////////////////////////////////////// + // Internal operations below here. + + @Override + protected final Iterable<T> decodeToIterable(List<T> decodedElements) { + return decodedElements; + } + + protected IterableCoder(Coder<T> elemCoder) { + super(elemCoder, "Iterable"); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/IterableLikeCoder.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/IterableLikeCoder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/IterableLikeCoder.java new file mode 100644 index 0000000..5bb6c66 --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/IterableLikeCoder.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.coder; + +import java.io.*; +import java.util.*; + +import static com.google.common.base.Preconditions.checkArgument; + +public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>> + extends StructuredCoder<IterableT> { + public Coder<T> getElemCoder() { + return elementCoder; + } + + protected abstract IterableT decodeToIterable(List<T> decodedElements); + + ///////////////////////////////////////////////////////////////////////////// + // Internal operations below here. + + private final Coder<T> elementCoder; + private final String iterableName; + + protected IterableLikeCoder(Coder<T> elementCoder, String iterableName) { + checkArgument(elementCoder != null, "element Coder for IterableLikeCoder must not be null"); + checkArgument(iterableName != null, "iterable name for IterableLikeCoder must not be null"); + this.elementCoder = elementCoder; + this.iterableName = iterableName; + } + + @Override + public void encode(IterableT iterable, OutputStream outStream) { + if (iterable == null) { + throw new CoderException("cannot encode a null " + iterableName); + } + DataOutputStream dataOutStream = new DataOutputStream(outStream); + try { + if (iterable instanceof Collection) { + // We can know the size of the Iterable. Use an encoding with a + // leading size field, followed by that many elements. + Collection<T> collection = (Collection<T>) iterable; + dataOutStream.writeInt(collection.size()); + for (T elem : collection) { + elementCoder.encode(elem, dataOutStream); + } + } else { + // We don't know the size without traversing it so use a fixed size buffer + // and encode as many elements as possible into it before outputting the size followed + // by the elements. + dataOutStream.writeInt(-1); + BufferedElementCountingOutputStream countingOutputStream = + new BufferedElementCountingOutputStream(dataOutStream); + for (T elem : iterable) { + countingOutputStream.markElementStart(); + elementCoder.encode(elem, countingOutputStream); + } + countingOutputStream.finish(); + } + // Make sure all our output gets pushed to the underlying outStream. + dataOutStream.flush(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public IterableT decode(InputStream inStream) { + try { + DataInputStream dataInStream = new DataInputStream(inStream); + int size = dataInStream.readInt(); + if (size >= 0) { + List<T> elements = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + elements.add(elementCoder.decode(dataInStream)); + } + return decodeToIterable(elements); + } + List<T> elements = new ArrayList<>(); + // We don't know the size a priori. Check if we're done with + // each block of elements. + long count = VarInt.decodeLong(dataInStream); + while (count > 0L) { + elements.add(elementCoder.decode(dataInStream)); + --count; + if (count == 0L) { + count = VarInt.decodeLong(dataInStream); + } + } + return decodeToIterable(elements); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return Arrays.asList(elementCoder); + } + + @Override + public void verifyDeterministic() throws Coder.NonDeterministicException { + throw new NonDeterministicException(this, + "IterableLikeCoder can not guarantee deterministic ordering."); + } + + @Override + public boolean isRegisterByteSizeObserverCheap( + IterableT iterable) { + return iterable instanceof ElementByteSizeObservableIterable; + } + + @Override + public void registerByteSizeObserver( + IterableT iterable, ElementByteSizeObserver observer) { + if (iterable == null) { + throw new CoderException("cannot encode a null Iterable"); + } + + if (iterable instanceof ElementByteSizeObservableIterable) { + observer.setLazy(); + ElementByteSizeObservableIterable<?, ?> observableIterable = + (ElementByteSizeObservableIterable<?, ?>) iterable; + observableIterable.addObserver( + new IteratorObserver(observer, iterable instanceof Collection)); + } else { + if (iterable instanceof Collection) { + // We can know the size of the Iterable. Use an encoding with a + // leading size field, followed by that many elements. + Collection<T> collection = (Collection<T>) iterable; + observer.update(4L); + for (T elem : collection) { + elementCoder.registerByteSizeObserver(elem, observer); + } + } else { + // TODO: (BEAM-1537) Update to use an accurate count depending on size and count, + // currently we are under estimating the size by up to 10 bytes per block of data since we + // are not encoding the count prefix which occurs at most once per 64k of data and is upto + // 10 bytes long. Since we include the total count we can upper bound the underestimate + // to be 10 / 65536 ~= 0.0153% of the actual size. + observer.update(4L); + long count = 0; + for (T elem : iterable) { + count += 1; + elementCoder.registerByteSizeObserver(elem, observer); + } + if (count > 0) { + // Update the length based upon the number of counted elements, this helps + // eliminate the case where all the elements are encoded in the first block and + // it is quite short (e.g. Long.MAX_VALUE nulls encoded with VoidCoder). + observer.update(VarInt.getLength(count)); + } + // Update with the terminator byte. + observer.update(1L); + } + } + } + + private class IteratorObserver implements Observer { + private final ElementByteSizeObserver outerObserver; + private final boolean countable; + + public IteratorObserver(ElementByteSizeObserver outerObserver, + boolean countable) { + this.outerObserver = outerObserver; + this.countable = countable; + + if (countable) { + // Additional 4 bytes are due to size. + outerObserver.update(4L); + } else { + // Additional 5 bytes are due to size = -1 (4 bytes) and + // hasNext = false (1 byte). + outerObserver.update(5L); + } + } + + @Override + public void update(Observable obs, Object obj) { + if (!(obj instanceof Long)) { + throw new AssertionError("unexpected parameter object"); + } + + if (countable) { + outerObserver.update(obs, obj); + } else { + // Additional 1 byte is due to hasNext = true flag. + outerObserver.update(obs, 1 + (long) obj); + } + } + } + + public static abstract class ElementByteSizeObservableIterable< + V, InputT extends ElementByteSizeObservableIterator<V>> + implements Iterable<V> { + private List<Observer> observers = new ArrayList<>(); + + protected abstract InputT createIterator(); + + public void addObserver(Observer observer) { + observers.add(observer); + } + + @Override + public InputT iterator() { + InputT iterator = createIterator(); + for (Observer observer : observers) { + iterator.addObserver(observer); + } + observers.clear(); + return iterator; + } + } + + public static abstract class ElementByteSizeObservableIterator<V> + extends Observable implements Iterator<V> { + protected final void notifyValueReturned(long byteSize) { + setChanged(); + notifyObservers(byteSize); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/IteratorObserver.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/IteratorObserver.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/IteratorObserver.java new file mode 100644 index 0000000..f1be5fb --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/IteratorObserver.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.coder; + +import java.util.Observable; +import java.util.Observer; + +public class IteratorObserver implements Observer { + private final ElementByteSizeObserver outerObserver; + private final boolean countable; + + public IteratorObserver(ElementByteSizeObserver outerObserver, + boolean countable) { + this.outerObserver = outerObserver; + this.countable = countable; + + if (countable) { + // Additional 4 bytes are due to size. + outerObserver.update(4L); + } else { + // Additional 5 bytes are due to size = -1 (4 bytes) and + // hasNext = false (1 byte). + outerObserver.update(5L); + } + } + + @Override + public void update(Observable obs, Object obj) { + if (!(obj instanceof Long)) { + throw new AssertionError("unexpected parameter object"); + } + + if (countable) { + outerObserver.update(obs, obj); + } else { + // Additional 1 byte is due to hasNext = true flag. + outerObserver.update(obs, 1 + (long) obj); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ListCoder.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ListCoder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ListCoder.java new file mode 100644 index 0000000..3858ec6 --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ListCoder.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.coder; + +import java.util.List; + +public class ListCoder<T> extends IterableLikeCoder<T, List<T>> { + + public static <T> ListCoder<T> of(Coder<T> elemCoder) { + return new ListCoder<>(elemCoder); + } + + ///////////////////////////////////////////////////////////////////////////// + // Internal operations below here. + + @Override + protected final List<T> decodeToIterable(List<T> decodedElements) { + return decodedElements; + } + + protected ListCoder(Coder<T> elemCoder) { + super(elemCoder, "List"); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + verifyDeterministic(this, "ListCoder.elemCoder must be deterministic", + (Iterable<Coder<?>>)getElemCoder()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/MapCoder.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/MapCoder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/MapCoder.java new file mode 100644 index 0000000..66b983c --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/MapCoder.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.coder; + +import com.google.common.collect.Maps; + +import java.io.*; +import java.util.*; + +public class MapCoder<K, V> extends StructuredCoder<Map<K, V>> { + + public static <K, V> MapCoder<K, V> of( + Coder<K> keyCoder, + Coder<V> valueCoder) { + return new MapCoder<>(keyCoder, valueCoder); + } + + public Coder<K> getKeyCoder() { + return keyCoder; + } + + public Coder<V> getValueCoder() { + return valueCoder; + } + + ///////////////////////////////////////////////////////////////////////////// + + private Coder<K> keyCoder; + private Coder<V> valueCoder; + + private MapCoder(Coder<K> keyCoder, Coder<V> valueCoder) { + this.keyCoder = keyCoder; + this.valueCoder = valueCoder; + } + + @Override + public void encode(Map<K, V> map, OutputStream outStream) throws CoderException { + if (map == null) { + throw new CoderException("cannot encode a null Map"); + } + DataOutputStream dataOutStream = new DataOutputStream(outStream); + + int size = map.size(); + try { + dataOutStream.writeInt(size); + } catch (IOException e) { + throw new RuntimeException(e); + } + if (size == 0) { + return; + } + + // Since we handled size == 0 above, entry is guaranteed to exist before and after loop + Iterator<Map.Entry<K, V>> iterator = map.entrySet().iterator(); + Map.Entry<K, V> entry = iterator.next(); + while (iterator.hasNext()) { + keyCoder.encode(entry.getKey(), outStream); + valueCoder.encode(entry.getValue(), outStream); + entry = iterator.next(); + } + + keyCoder.encode(entry.getKey(), outStream); + valueCoder.encode(entry.getValue(), outStream); + // no flush needed as DataOutputStream does not buffer + } + + @Override + public Map<K, V> decode(InputStream inStream) { + DataInputStream dataInStream = new DataInputStream(inStream); + int size = 0; + try { + size = dataInStream.readInt(); + } catch (IOException e) { + throw new RuntimeException(e); + } + if (size == 0) { + return Collections.emptyMap(); + } + + Map<K, V> retval = Maps.newHashMapWithExpectedSize(size); + for (int i = 0; i < size - 1; ++i) { + K key = keyCoder.decode(inStream); + V value = valueCoder.decode(inStream); + retval.put(key, value); + } + + K key = keyCoder.decode(inStream); + V value = valueCoder.decode(inStream); + retval.put(key, value); + return retval; + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return Arrays.asList(keyCoder, valueCoder); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + throw new NonDeterministicException(this, + "Ordering of entries in a Map may be non-deterministic."); + } + + @Override + public void registerByteSizeObserver( + Map<K, V> map, ElementByteSizeObserver observer) { + observer.update(4L); + if (map.isEmpty()) { + return; + } + Iterator<Map.Entry<K, V>> entries = map.entrySet().iterator(); + Map.Entry<K, V> entry = entries.next(); + while (entries.hasNext()) { + keyCoder.registerByteSizeObserver(entry.getKey(), observer); + valueCoder.registerByteSizeObserver(entry.getValue(), observer); + entry = entries.next(); + } + keyCoder.registerByteSizeObserver(entry.getKey(), observer); + valueCoder.registerByteSizeObserver(entry.getValue(), observer); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/SetCoder.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/SetCoder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/SetCoder.java new file mode 100644 index 0000000..4147732 --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/SetCoder.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.coder; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class SetCoder<T> extends IterableLikeCoder<T, Set<T>> { + + public static <T> SetCoder<T> of(Coder<T> elementCoder) { + return new SetCoder<>(elementCoder); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + throw new NonDeterministicException(this, + "Ordering of elements in a set may be non-deterministic."); + } + + ///////////////////////////////////////////////////////////////////////////// + // Internal operations below here. + + @Override + protected final Set<T> decodeToIterable(List<T> decodedElements) { + return new HashSet<>(decodedElements); + } + + protected SetCoder(Coder<T> elemCoder) { + super(elemCoder, "Set"); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/StringUtf8Coder.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/StringUtf8Coder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/StringUtf8Coder.java new file mode 100644 index 0000000..73ea8eb --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/StringUtf8Coder.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.coder; + +import com.google.common.base.Utf8; + +import java.io.*; +import java.nio.charset.StandardCharsets; + +public class StringUtf8Coder extends AtomicCoder<String> { + + public static StringUtf8Coder of() { + return INSTANCE; + } + + ///////////////////////////////////////////////////////////////////////////// + + private static final StringUtf8Coder INSTANCE = new StringUtf8Coder(); + + private static void writeString(String value, DataOutputStream dos) + throws IOException { + byte[] bytes = value.getBytes(StandardCharsets.UTF_8); + VarInt.encode(bytes.length, dos); + dos.write(bytes); + } + + private static String readString(DataInputStream dis) throws IOException { + int len = VarInt.decodeInt(dis); + if (len < 0) { + throw new CoderException("Invalid encoded string length: " + len); + } + byte[] bytes = new byte[len]; + dis.readFully(bytes); + return new String(bytes, StandardCharsets.UTF_8); + } + + private StringUtf8Coder() { + } + + @Override + public void encode(String value, OutputStream outStream) throws CoderException { + if (value == null) { + throw new CoderException("cannot encode a null String"); + } + try { + writeString(value, new DataOutputStream(outStream)); + } catch (IOException e) { + throw new CoderException(e); + } + } + + @Override + public String decode(InputStream inStream) + throws CoderException { + try { + return readString(new DataInputStream(inStream)); + } catch (EOFException | UTFDataFormatException exn) { + // These exceptions correspond to decoding problems, so change + // what kind of exception they're branded as. + throw new CoderException(exn); + } catch (Exception e) { + throw new CoderException(e); + } + } + + @Override + public void verifyDeterministic() { + } + + @Override + public boolean consistentWithEquals() { + return true; + } + + @Override + public long getEncodedElementByteSize(String value) { + if (value == null) { + throw new CoderException("cannot encode a null String"); + } + int size = Utf8.encodedLength(value); + return VarInt.getLength(size) + size; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/StructuralByteArray.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/StructuralByteArray.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/StructuralByteArray.java new file mode 100644 index 0000000..6a371f6 --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/StructuralByteArray.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.coder; + +import com.google.common.io.BaseEncoding; + +import java.util.Arrays; + +public class StructuralByteArray { + byte[] value; + + public StructuralByteArray(byte[] value) { + this.value = value; + } + + public byte[] getValue() { + return value; + } + + @Override + public boolean equals(Object o) { + if (o instanceof StructuralByteArray) { + StructuralByteArray that = (StructuralByteArray) o; + return Arrays.equals(this.value, that.value); + } else { + return false; + } + } + + @Override + public int hashCode() { + return Arrays.hashCode(value); + } + + @Override + public String toString() { + return "base64:" + BaseEncoding.base64().encode(value); + } +}
