http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixKeyValueSerdeManager.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixKeyValueSerdeManager.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixKeyValueSerdeManager.java new file mode 100644 index 0000000..57638d8 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixKeyValueSerdeManager.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import org.apache.apex.malhar.lib.state.managed.Bucket; + +import com.esotericsoftware.kryo.io.Input; + +import com.datatorrent.netlet.util.Slice; + +/** + * All spillable data structures use this class to manage the buffers for serialization. + * This class contains serialization logic that is common for all spillable data structures + * + * @param <K> + * @param <V> + */ +public class AffixKeyValueSerdeManager<K, V> extends KeyValueSerdeManager<K, V> +{ + /** + * The read buffer will be released when read is done, while write buffer should be held until the data has been persisted. + * The write buffer should be non-transient. The data which has been already saved to files will be removed by {@link Bucket} + * while the data which haven't been saved need to be recovered by the platform from checkpoint. + */ + private AffixSerde<K> metaKeySerde; + private AffixSerde<K> dataKeySerde; + + + private AffixKeyValueSerdeManager() + { + //for kyro + } + + public AffixKeyValueSerdeManager(byte[] metaKeySuffix, byte[] dataKeyIdentifier, Serde<K> keySerde, Serde<V> valueSerde) + { + this.valueSerde = valueSerde; + metaKeySerde = new AffixSerde<K>(null, keySerde, metaKeySuffix); + dataKeySerde = new AffixSerde<K>(dataKeyIdentifier, keySerde, null); + } + + public Slice serializeMetaKey(K key, boolean write) + { + SerializationBuffer buffer = write ? keyBufferForWrite : keyBufferForRead; + metaKeySerde.serialize(key, buffer); + return buffer.toSlice(); + } + + public Slice serializeDataKey(K key, boolean write) + { + SerializationBuffer buffer = write ? keyBufferForWrite : keyBufferForRead; + dataKeySerde.serialize(key, buffer); + return buffer.toSlice(); + } + + public V deserializeValue(Input input) + { + V value = valueSerde.deserialize(input); + return value; + } +}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixSerde.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixSerde.java new file mode 100644 index 0000000..7504633 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixSerde.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +/** + * AffixSerde provides serde for adding prefix or suffix + * + * @param <T> + */ +public class AffixSerde<T> implements Serde<T> +{ + private Serde<T> serde; + private byte[] prefix; + private byte[] suffix; + + private AffixSerde() + { + //kyro + } + + public AffixSerde(byte[] prefix, Serde<T> serde, byte[] suffix) + { + this.prefix = prefix; + this.suffix = suffix; + this.serde = serde; + } + + @Override + public void serialize(T object, Output output) + { + if (prefix != null && prefix.length > 0) { + output.write(prefix); + } + serde.serialize(object, output); + if (suffix != null && suffix.length > 0) { + output.write(suffix); + } + } + + @Override + public T deserialize(Input input) + { + if (prefix != null && prefix.length > 0) { + input.skip(prefix.length); + } + return serde.deserialize(input); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ArraySerde.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ArraySerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ArraySerde.java new file mode 100644 index 0000000..4b2a45b --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ArraySerde.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import java.lang.reflect.Array; + +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.base.Preconditions; + +public class ArraySerde<T> implements Serde<T[]> +{ + private Serde<T> itemSerde; + private Class<T> itemType; + + private ArraySerde() + { + } + + /** + * Serializer and Deserializer need different constructor, so use static factory method to wrap. + * The ArraySerde returned by newSerializer can only used for serialization + */ + public static <T> ArraySerde<T> newSerializer(Serde<T> itemSerde) + { + return new ArraySerde<T>(Preconditions.checkNotNull(itemSerde)); + } + + public static <T> ArraySerde<T> newSerde(Serde<T> itemSerde, Class<T> itemType) + { + return new ArraySerde<T>(Preconditions.checkNotNull(itemSerde), Preconditions.checkNotNull(itemType)); + } + + private ArraySerde(Serde<T> itemSerde) + { + this.itemSerde = itemSerde; + } + + private ArraySerde(Serde<T> itemSerde, Class<T> itemType) + { + this.itemSerde = itemSerde; + this.itemType = itemType; + } + + @Override + public void serialize(T[] objects, Output output) + { + if (objects.length == 0) { + return; + } + output.writeInt(objects.length, true); + Serde<T> serializer = getItemSerde(); + for (T object : objects) { + serializer.serialize(object, output); + } + } + + protected Serde<T> getItemSerde() + { + return itemSerde; + } + + @Override + public T[] deserialize(Input input) + { + int numOfElements = input.readInt(true); + + T[] array = createObjectArray(numOfElements); + + for (int index = 0; index < numOfElements; ++index) { + array[index] = getItemSerde().deserialize(input); + } + return array; + } + + @SuppressWarnings("unchecked") + protected T[] createObjectArray(int length) + { + return (T[])Array.newInstance(itemType, length); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Block.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Block.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Block.java new file mode 100644 index 0000000..c140962 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Block.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.netlet.util.Slice; + +/** + * + * keep the information of one block + * + */ +public class Block +{ + public static class OutOfBlockBufferMemoryException extends RuntimeException + { + private static final long serialVersionUID = 3813792889200989131L; + } + + private static final Logger logger = LoggerFactory.getLogger(Block.class); + + public static final int DEFAULT_BLOCK_SIZE = 100000; + + //the capacity of the block + private int capacity; + + /* + * the size of the data. + */ + private volatile int size; + + private int objectBeginOffset = 0; + private byte[] buffer; + + /** + * whether any slices have been exposed to the caller. + */ + private boolean exposedSlices; + + private Block() + { + this(DEFAULT_BLOCK_SIZE); + } + + public Block(int capacity) + { + if (capacity <= 0) { + throw new IllegalArgumentException("Invalid capacity: " + capacity); + } + buffer = new byte[capacity]; + this.capacity = capacity; + } + + public void write(byte data) + { + checkOrReallocateBuffer(1); + buffer[size++] = data; + } + + public void write(byte[] data) + { + write(data, 0, data.length); + } + + public void write(byte[] data, final int offset, final int length) + { + checkOrReallocateBuffer(length); + + System.arraycopy(data, offset, buffer, size, length); + size += length; + } + + + + /** + * check the buffer size and reallocate if buffer is not enough + * + * @param length + */ + private void checkOrReallocateBuffer(int length) throws OutOfBlockBufferMemoryException + { + if (size + length <= capacity) { + return; + } + + if (exposedSlices) { + throw new OutOfBlockBufferMemoryException(); + } + + //calculate the new capacity + capacity = (size + length) * 2; + + byte[] oldBuffer = buffer; + buffer = new byte[capacity]; + + /** + * no slices are exposed in this block yet (this is the first object in this block). + * so we can reallocate and move the memory + */ + if (size > 0) { + System.arraycopy(oldBuffer, 0, buffer, 0, size); + } + } + + /** + * Similar to toSlice, this method is used to get the information of the + * object regards the data already write to buffer. But unlike toSlice() which + * indicates all the writes of this object are already done, this method can be called at + * any time + */ + public Slice getLastObjectSlice() + { + return new Slice(buffer, objectBeginOffset, size - objectBeginOffset); + } + + public void discardLastObjectData() + { + if (objectBeginOffset == 0) { + return; + } + size = objectBeginOffset; + } + + public void moveLastObjectDataTo(Block newBlock) + { + if (size > objectBeginOffset) { + newBlock.write(buffer, objectBeginOffset, size - objectBeginOffset); + discardLastObjectData(); + } + } + + /** + * This method returns the slice that represents the serialized form. + * The process of serializing an object should be one or multiple calls of write() followed by a toSlice() call. + * A call to toSlice indicates the writes are done for this object + * + * @return + */ + public BufferSlice toSlice() + { + if (size == objectBeginOffset) { + throw new RuntimeException("data size is zero."); + } + BufferSlice slice = new BufferSlice(buffer, objectBeginOffset, size - objectBeginOffset); + //prepare for next object + objectBeginOffset = size; + exposedSlices = true; + return slice; + } + + public void reset() + { + size = 0; + objectBeginOffset = 0; + exposedSlices = false; + } + + /** + * check if the block has enough space for the length + * + * @param length + * @return + */ + public boolean hasEnoughSpace(int length) + { + return size + length < capacity; + } + + public long size() + { + return size; + } + + public long capacity() + { + return capacity; + } + + public boolean isFresh() + { + return (size == 0 && objectBeginOffset == 0 && exposedSlices == false); + } + + /** + * Returns whether the block is clear. The block is clear when there has not been any write calls since the last toSlice() call. + * + * @return + */ + public boolean isClear() + { + return objectBeginOffset == size; + } + + public void release() + { + reset(); + buffer = null; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockReleaseStrategy.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockReleaseStrategy.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockReleaseStrategy.java new file mode 100644 index 0000000..f8a097e --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockReleaseStrategy.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +/** + * The process of interface would be: + * - Stream keep on reporting how many free blocks it has in certain frequent. usually at the end of each window + * - Stream check how many block should release. Stream usually release the blocks but Stream can make its own decision + * - Stream report how many blocks actually released + */ +public interface BlockReleaseStrategy +{ + /** + * The stream should call this method to report to the strategy how many blocks are free currently. + * @param freeBlockNum + */ + void currentFreeBlocks(int freeBlockNum); + + /** + * Get how many blocks can be released + * @return + */ + int getNumBlocksToRelease(); + + /** + * The stream should call this method to report how many block are released. + * @param numReleasedBlocks + */ + void releasedBlocks(int numReleasedBlocks); + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockStream.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockStream.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockStream.java new file mode 100644 index 0000000..ee50f7d --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockStream.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import java.io.OutputStream; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Maps; + +import com.datatorrent.netlet.util.Slice; + +/** + * A stream is a collection of blocks + * BlockStream avoids copying the data that are already exposed to the caller + * + */ +public class BlockStream extends OutputStream +{ + private static final Logger logger = LoggerFactory.getLogger(BlockStream.class); + + //the initial capacity of each block + protected final int blockCapacity; + + protected Map<Integer, Block> blocks = Maps.newHashMap(); + //the index of current block, valid block index should >= 0 + protected int currentBlockIndex = 0; + protected long size = 0; + + protected Block currentBlock; + + public BlockStream() + { + this(Block.DEFAULT_BLOCK_SIZE); + } + + public BlockStream(int blockCapacity) + { + this.blockCapacity = blockCapacity; + } + + @Override + public void write(byte[] data) + { + write(data, 0, data.length); + } + + @Override + public void write(int b) + { + currentBlock = getOrCreateCurrentBlock(); + try { + currentBlock.write((byte)b); + } catch (Block.OutOfBlockBufferMemoryException e) { + reallocateBlock(); + currentBlock.write((byte)b); + } + size++; + } + + /** + * This write could be called multiple times for an object. + * The write method makes sure the same object only write to one block + * + * @param data + * @param offset + * @param length + */ + @Override + public void write(byte[] data, final int offset, final int length) + { + //start with a block which at least can hold this data + currentBlock = getOrCreateCurrentBlock(); + try { + currentBlock.write(data, offset, length); + } catch (Block.OutOfBlockBufferMemoryException e) { + reallocateBlock(); + currentBlock.write(data, offset, length); + } + size += length; + } + + private void reallocateBlock() + { + //use next block + Block previousBlock = moveToNextBlock(); + if (!currentBlock.isFresh()) { + throw new RuntimeException("New block is not fresh."); + } + if (!previousBlock.isClear()) { + previousBlock.moveLastObjectDataTo(currentBlock); + } + } + + /** + * + * @return The previous block + */ + protected Block moveToNextBlock() + { + Block previousBlock = currentBlock; + + ++currentBlockIndex; + currentBlock = getOrCreateCurrentBlock(); + if (!currentBlock.isFresh()) { + throw new RuntimeException("Assigned non fresh block."); + } + return previousBlock; + } + + protected Block getOrCreateCurrentBlock() + { + Block block = blocks.get(currentBlockIndex); + if (block == null) { + block = new Block(blockCapacity); + blocks.put(currentBlockIndex, block); + } + return block; + } + + public long size() + { + return size; + } + + public long capacity() + { + long capacity = 0; + for (Block block : blocks.values()) { + capacity += block.capacity(); + } + return capacity; + } + + /** + * + * this is the call that represents the end of an object + */ + public Slice toSlice() + { + return blocks.get(currentBlockIndex).toSlice(); + } + + /** + * resets all blocks + */ + public void reset() + { + currentBlockIndex = 0; + size = 0; + for (Block block : blocks.values()) { + block.reset(); + } + } + + public void release() + { + reset(); + blocks.clear(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BufferSlice.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BufferSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BufferSlice.java new file mode 100644 index 0000000..5d830fe --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BufferSlice.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import org.getopt.util.hash.MurmurHash; + +import org.apache.commons.lang3.ArrayUtils; + +import com.datatorrent.netlet.util.Slice; + +/** + * com.datatorrent.netlet.util.Slice has problem with the hashCode(), so + * override here + * + */ +public class BufferSlice extends Slice +{ + private static final long serialVersionUID = -471209532589983329L; + public static final BufferSlice EMPTY_SLICE = new BufferSlice(ArrayUtils.EMPTY_BYTE_ARRAY); + + //for kyro + private BufferSlice() + { + //the super class's default constructor is private and can't called. + super(null, 0, 0); + } + + public BufferSlice(byte[] array, int offset, int length) + { + super(array, offset, length); + } + + public BufferSlice(byte[] array) + { + super(array); + } + + public BufferSlice(Slice netletSlice) + { + this(netletSlice.buffer, netletSlice.offset, netletSlice.length); + } + + @Override + public int hashCode() + { + int hash = 5; + hash = 59 * hash + MurmurHash.hash(buffer, hash, offset, length); + hash = 59 * hash + this.length; + return hash; + } + + /** + * let this class equals with com.datatorrent.netlet.util.Slice + */ + @Override + public boolean equals(Object obj) + { + if (obj == null) { + return false; + } + if (!Slice.class.isAssignableFrom(obj.getClass())) { + return false; + } + final Slice other = (Slice)obj; + if (this.length != other.length) { + return false; + } + + final int offset1 = this.offset; + final byte[] buffer1 = this.buffer; + int i = offset1 + this.length; + + final byte[] buffer2 = other.buffer; + int j = other.offset + other.length; + + while (i-- > offset1) { + if (buffer1[i] != buffer2[--j]) { + return false; + } + } + + return true; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerde.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerde.java new file mode 100644 index 0000000..bcd0b74 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerde.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import java.util.Collection; + +import javax.validation.constraints.NotNull; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; + +/** + * This is an implementation of {@link Serde} which serializes and deserializes lists. + * + * @since 3.5.0 + */ [email protected] +public class CollectionSerde<T, CollectionT extends Collection<T>> implements Serde<CollectionT> +{ + @NotNull + private Serde<T> serde; + + @NotNull + private Class<? extends CollectionT> collectionClass; + + private CollectionSerde() + { + // for Kryo + } + + /** + * Creates a {@link CollectionSerde}. + * @param serde The {@link Serde} that is used to serialize and deserialize each element of a list. + */ + public CollectionSerde(@NotNull Serde<T> serde, @NotNull Class<? extends CollectionT> collectionClass /*Class<? extends C1> collectionClass*/ ) + { + this.serde = Preconditions.checkNotNull(serde); + this.collectionClass = Preconditions.checkNotNull(collectionClass); + } + + @Override + public void serialize(CollectionT objects, Output output) + { + if (objects.size() == 0) { + return; + } + output.writeInt(objects.size(), true); + Serde<T> serializer = getItemSerde(); + for (T object : objects) { + serializer.serialize(object, output); + } + } + + @Override + public CollectionT deserialize(Input input) + { + int numElements = input.readInt(true); + + try { + CollectionT collection = collectionClass.newInstance(); + + for (int index = 0; index < numElements; index++) { + T object = serde.deserialize(input); + collection.add(object); + } + + return collection; + } catch (Exception ex) { + throw Throwables.propagate(ex); + } + } + + protected Serde<T> getItemSerde() + { + return serde; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java new file mode 100644 index 0000000..93929e4 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import java.util.Arrays; + +import org.apache.commons.collections.buffer.CircularFifoBuffer; + +/** + * This implementation get the minimum number of free blocks in the period to release. + * + */ +public class DefaultBlockReleaseStrategy implements BlockReleaseStrategy +{ + public static final int DEFAULT_PERIOD = 60; // 60 reports + private CircularFifoBuffer freeBlockNumQueue; + private Integer[] tmpArray; + + public DefaultBlockReleaseStrategy() + { + this(DEFAULT_PERIOD); + } + + public DefaultBlockReleaseStrategy(int period) + { + freeBlockNumQueue = new CircularFifoBuffer(period); + tmpArray = new Integer[period]; + Arrays.fill(tmpArray, 0); + } + + /** + * The stream calls this to report to the strategy how many blocks are free currently. + * @param freeBlockNum + */ + @Override + public void currentFreeBlocks(int freeBlockNum) + { + if (freeBlockNum < 0) { + throw new IllegalArgumentException("The number of free blocks could not less than zero."); + } + freeBlockNumQueue.add(freeBlockNum); + } + + /** + * Get how many blocks that can be released + * @return + */ + @Override + public int getNumBlocksToRelease() + { + int minNum = Integer.MAX_VALUE; + for (Object num : freeBlockNumQueue) { + minNum = Math.min((Integer)num, minNum); + } + return minNum; + } + + + /** + * report how many blocks that have been released. + * @param numReleasedBlocks + */ + @Override + public void releasedBlocks(int numReleasedBlocks) + { + if (numReleasedBlocks == 0) { + return; + } + if (numReleasedBlocks < 0) { + throw new IllegalArgumentException("Num of released blocks should not be negative"); + } + /** + * decrease by released blocks + */ + for (Object num : freeBlockNumQueue) { + freeBlockNumQueue.add(Math.max((Integer)num - numReleasedBlocks, 0)); + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java new file mode 100644 index 0000000..0fbb2ab --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +/** + * Generic serde using Kryo serialization. Note that while this is convenient, it may not be desirable because + * using Kryo makes the object being serialized rigid, meaning you won't be able to make backward compatible or + * incompatible changes to the class being serialized. + * + * @param <T> The type being serialized + */ [email protected] +public class GenericSerde<T> implements Serde<T> +{ + private transient ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() + { + @Override + public Kryo get() + { + return new Kryo(); + } + }; + + private final Class<? extends T> clazz; + + public GenericSerde() + { + this.clazz = null; + } + + public GenericSerde(Class<? extends T> clazz) + { + this.clazz = clazz; + } + + @Override + public void serialize(T object, Output output) + { + Kryo kryo = kryos.get(); + if (clazz == null) { + kryo.writeClassAndObject(output, object); + } else { + kryo.writeObject(output, object); + } + } + + @Override + public T deserialize(Input input) + { + T object; + Kryo kryo = kryos.get(); + if (clazz == null) { + object = (T)kryo.readClassAndObject(input); + } else { + object = kryo.readObject(input, clazz); + } + return object; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/IntSerde.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/IntSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/IntSerde.java new file mode 100644 index 0000000..032b5e0 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/IntSerde.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +/** + * This is an implementation of {@link Serde} which deserializes and serializes integers. + * + * @since 3.5.0 + */ [email protected] +public class IntSerde implements Serde<Integer> +{ + @Override + public void serialize(Integer value, Output output) + { + output.writeInt(value); + } + + @Override + public Integer deserialize(Input input) + { + return input.readInt(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueByteStreamProvider.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueByteStreamProvider.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueByteStreamProvider.java new file mode 100644 index 0000000..a7dfa7f --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueByteStreamProvider.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +/** + * This interface provides methods for stream for key/value. + * The implementation can separate the stream for key and value or share the same one. + * + */ +public interface KeyValueByteStreamProvider +{ + /** + * @return The stream for keeping key + */ + WindowedBlockStream getKeyStream(); + + /** + * @return The stream for keeping value + */ + WindowedBlockStream getValueStream(); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java new file mode 100644 index 0000000..6fbe9fe --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import org.apache.apex.malhar.lib.state.managed.Bucket; +import org.apache.apex.malhar.lib.state.managed.BucketProvider; + +import com.datatorrent.netlet.util.Slice; + +public class KeyValueSerdeManager<K, V> +{ + protected Serde<K> keySerde; + protected Serde<V> valueSerde; + + protected SerializationBuffer keyBufferForWrite; + protected transient SerializationBuffer keyBufferForRead = SerializationBuffer.READ_BUFFER; + + protected SerializationBuffer valueBuffer; + + + protected KeyValueSerdeManager() + { + //for kyro + } + + public KeyValueSerdeManager(Serde<K> keySerde, Serde<V> valueSerde) + { + this.keySerde = keySerde; + this.valueSerde = valueSerde; + } + + public void setup(BucketProvider bp, long bucketId) + { + //the bucket will not change for this class. so get streams from setup, else, need to set stream before serialize + Bucket bucketInst = bp.ensureBucket(bucketId); + this.valueBuffer = new SerializationBuffer(bucketInst.getValueStream()); + + keyBufferForWrite = new SerializationBuffer(bucketInst.getKeyStream()); + } + + public Slice serializeKey(K key, boolean write) + { + SerializationBuffer buffer = write ? keyBufferForWrite : keyBufferForRead; + keySerde.serialize(key, buffer); + return buffer.toSlice(); + } + + + /** + * Value only serialize for writing + * @param value + * @return + */ + public Slice serializeValue(V value) + { + valueSerde.serialize(value, valueBuffer); + return valueBuffer.toSlice(); + } + + public void beginWindow(long windowId) + { + keyBufferForWrite.beginWindow(windowId); + valueBuffer.beginWindow(windowId); + } + + public void resetReadBuffer() + { + keyBufferForRead.release(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/LongSerde.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/LongSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/LongSerde.java new file mode 100644 index 0000000..0b63737 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/LongSerde.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +/** + * This is an implementation of {@link Serde} which deserializes and serializes integers. + * + * @since 3.5.0 + */ [email protected] +public class LongSerde implements Serde<Long> +{ + @Override + public void serialize(Long value, Output output) + { + output.writeLong(value); + } + + @Override + public Long deserialize(Input input) + { + return input.readLong(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PairSerde.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PairSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PairSerde.java new file mode 100644 index 0000000..3190880 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PairSerde.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import javax.validation.constraints.NotNull; + +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.classification.InterfaceStability; + +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.base.Preconditions; + +/** + * This is an implementation of {@link Serde} which serializes and deserializes pairs. + */ [email protected] +public class PairSerde<T1, T2> implements Serde<Pair<T1, T2>> +{ + @NotNull + private Serde<T1> serde1; + @NotNull + private Serde<T2> serde2; + + private PairSerde() + { + // for Kryo + } + + /** + * Creates a {@link PairSerde}. + * @param serde1 The {@link Serde} that is used to serialize and deserialize first element of a pair + * @param serde2 The {@link Serde} that is used to serialize and deserialize second element of a pair + */ + public PairSerde(@NotNull Serde<T1> serde1, @NotNull Serde<T2> serde2) + { + this.serde1 = Preconditions.checkNotNull(serde1); + this.serde2 = Preconditions.checkNotNull(serde2); + } + + @Override + public void serialize(Pair<T1, T2> pair, Output output) + { + serde1.serialize(pair.getLeft(), output); + serde2.serialize(pair.getRight(), output); + } + + @Override + public Pair<T1, T2> deserialize(Input input) + { + T1 first = serde1.deserialize(input); + T2 second = serde2.deserialize(input); + return new ImmutablePair<>(first, second); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java deleted file mode 100644 index 9669981..0000000 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.utils.serde; - -import org.apache.commons.lang3.mutable.MutableInt; -import org.apache.hadoop.classification.InterfaceStability; - -/** - * This is a simple pass through {@link Serde}. When serialization is performed the input byte array is returned. - * Similarly when deserialization is performed the input byte array is returned. - * - * @since 3.4.0 - */ [email protected] -public class PassThruByteArraySerde implements Serde<byte[], byte[]> -{ - @Override - public byte[] serialize(byte[] object) - { - return object; - } - - @Override - public byte[] deserialize(byte[] object, MutableInt offset) - { - offset.add(object.length); - return object; - } - - @Override - public byte[] deserialize(byte[] object) - { - return object; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java deleted file mode 100644 index b22bf6f..0000000 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.utils.serde; - -import org.apache.commons.lang3.mutable.MutableInt; - -import com.datatorrent.netlet.util.Slice; - -/** - * This is a simple {@link Serde} which serializes and deserializes byte arrays to {@link Slice}s. A byte array is - * serialized by simply wrapping it in a {@link Slice} object and deserialized by simply reading the byte array - * out of the {@link Slice} object. - * - * <b>Note:</b> The deserialized method doesn't use the offset argument in this implementation. - * - * @since 3.5.0 - */ -public class PassThruByteArraySliceSerde implements Serde<byte[], Slice> -{ - @Override - public Slice serialize(byte[] object) - { - return new Slice(object); - } - - @Override - public byte[] deserialize(Slice object, MutableInt offset) - { - offset.add(object.length); - - if (object.offset == 0) { - return object.buffer; - } - - byte[] bytes = new byte[object.length]; - System.arraycopy(object.buffer, object.offset, bytes, 0, object.length); - return bytes; - } - - @Override - public byte[] deserialize(Slice object) - { - return deserialize(object, new MutableInt(0)); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java index 2646c0e..679e116 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java @@ -18,9 +18,14 @@ */ package org.apache.apex.malhar.lib.utils.serde; -import org.apache.commons.lang3.mutable.MutableInt; +import java.io.IOException; + import org.apache.hadoop.classification.InterfaceStability; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.base.Throwables; + import com.datatorrent.netlet.util.Slice; /** @@ -30,23 +35,26 @@ import com.datatorrent.netlet.util.Slice; * @since 3.5.0 */ @InterfaceStability.Evolving -public class PassThruSliceSerde implements Serde<Slice, Slice> +public class PassThruSliceSerde implements Serde<Slice> { @Override - public Slice serialize(Slice object) - { - return object; - } - - @Override - public Slice deserialize(Slice object, MutableInt offset) + public void serialize(Slice slice, Output output) { - return object; + output.write(slice.buffer, slice.offset, slice.length); } @Override - public Slice deserialize(Slice object) + public Slice deserialize(Input input) { - return object; + if (input.getInputStream() != null) { + // The input is backed by a stream, cannot directly use its internal buffer + try { + return new Slice(input.readBytes(input.available())); + } catch (IOException ex) { + throw Throwables.propagate(ex); + } + } else { + return new Slice(input.getBuffer(), input.position(), input.limit() - input.position()); + } } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Serde.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Serde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Serde.java index 6e02aee..d09612d 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Serde.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Serde.java @@ -18,46 +18,29 @@ */ package org.apache.apex.malhar.lib.utils.serde; -import org.apache.commons.lang3.mutable.MutableInt; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; /** * This is an interface for a Serializer/Deserializer class. - * @param <OBJ> The type of the object to Serialize and Deserialize. - * @param <SER> The type to Serialize an Object to. + * @param <T> The type of the object to Serialize and Deserialize. * * @since 3.4.0 */ -public interface Serde<OBJ, SER> +public interface Serde<T> { /** - * Serialized the given object. - * @param object The object to serialize. - * @return The serialized representation of the object. + * Serialize the object to the given output. + * @param object + * @param output */ - SER serialize(OBJ object); + void serialize(T object, Output output); /** - * Deserializes the given serialized representation of an object. - * @param object The serialized representation of an object. - * @param offset An offset in the serialized representation of the object. After the - * deserialize method completes the offset is updated, so that the offset points to - * the remaining unprocessed portion of the serialized object. For example:<br/> - * {@code - * Object obj; - * MutableInt mi; - * someObj1 = deserialize(obj, mi); - * someObj2 = deserialize(obj, mi); - * } + * Deserialize from the input and return a new object. * - * @return The deserialized object. + * @param input + * @return */ - OBJ deserialize(SER object, MutableInt offset); - - /** - * Deserializes the given serialized representation of an object. - * @param object The serialized representation of an object. - * - * @return The deserialized object. - */ - OBJ deserialize(SER object); + T deserialize(Input input); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSlice.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSlice.java deleted file mode 100644 index eca1d5f..0000000 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSlice.java +++ /dev/null @@ -1,120 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.utils.serde; - -import java.util.Collection; - -import javax.validation.constraints.NotNull; - -import org.apache.commons.lang3.mutable.MutableInt; -import org.apache.hadoop.classification.InterfaceStability; - -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; - -import com.datatorrent.lib.appdata.gpo.GPOUtils; -import com.datatorrent.netlet.util.Slice; - -/** - * This is an implementation of {@link Serde} which serializes and deserializes lists. - * - * @since 3.5.0 - */ [email protected] -public class SerdeCollectionSlice<T, CollectionT extends Collection<T>> implements Serde<CollectionT, Slice> -{ - @NotNull - private Serde<T, Slice> serde; - - @NotNull - private Class<? extends CollectionT> collectionClass; - - private SerdeCollectionSlice() - { - // for Kryo - } - - /** - * Creates a {@link SerdeCollectionSlice}. - * @param serde The {@link Serde} that is used to serialize and deserialize each element of a list. - */ - public SerdeCollectionSlice(@NotNull Serde<T, Slice> serde, @NotNull Class<? extends CollectionT> collectionClass) - { - this.serde = Preconditions.checkNotNull(serde); - this.collectionClass = Preconditions.checkNotNull(collectionClass); - } - - @Override - public Slice serialize(CollectionT objects) - { - Slice[] slices = new Slice[objects.size()]; - - int size = 4; - - int index = 0; - for (T object : objects) { - Slice slice = serde.serialize(object); - slices[index++] = slice; - size += slice.length; - } - - byte[] bytes = new byte[size]; - int offset = 0; - - byte[] sizeBytes = GPOUtils.serializeInt(objects.size()); - System.arraycopy(sizeBytes, 0, bytes, offset, 4); - offset += 4; - - for (index = 0; index < slices.length; index++) { - Slice slice = slices[index]; - System.arraycopy(slice.buffer, slice.offset, bytes, offset, slice.length); - offset += slice.length; - } - - return new Slice(bytes); - } - - @Override - public CollectionT deserialize(Slice slice, MutableInt offset) - { - MutableInt sliceOffset = new MutableInt(slice.offset + offset.intValue()); - - int numElements = GPOUtils.deserializeInt(slice.buffer, sliceOffset); - sliceOffset.subtract(slice.offset); - try { - CollectionT collection = collectionClass.newInstance(); - - for (int index = 0; index < numElements; index++) { - T object = serde.deserialize(slice, sliceOffset); - collection.add(object); - } - - offset.setValue(sliceOffset.intValue()); - return collection; - } catch (Exception ex) { - throw Throwables.propagate(ex); - } - } - - @Override - public CollectionT deserialize(Slice slice) - { - return deserialize(slice, new MutableInt(0)); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeIntSlice.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeIntSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeIntSlice.java deleted file mode 100644 index 3275a93..0000000 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeIntSlice.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.utils.serde; - -import org.apache.commons.lang3.mutable.MutableInt; -import org.apache.hadoop.classification.InterfaceStability; - -import com.datatorrent.lib.appdata.gpo.GPOUtils; -import com.datatorrent.netlet.util.Slice; - -/** - * This is an implementation of {@link Serde} which deserializes and serializes integers. - * - * @since 3.5.0 - */ [email protected] -public class SerdeIntSlice implements Serde<Integer, Slice> -{ - @Override - public Slice serialize(Integer object) - { - return new Slice(GPOUtils.serializeInt(object)); - } - - @Override - public Integer deserialize(Slice slice, MutableInt offset) - { - int val = GPOUtils.deserializeInt(slice.buffer, new MutableInt(slice.offset + offset.intValue())); - offset.add(4); - return val; - } - - @Override - public Integer deserialize(Slice object) - { - return deserialize(object, new MutableInt(0)); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSlice.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSlice.java deleted file mode 100644 index d4b9488..0000000 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSlice.java +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.utils.serde; - -import java.io.ByteArrayOutputStream; - -import org.apache.commons.lang3.mutable.MutableInt; -import org.apache.hadoop.classification.InterfaceStability; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; - -import com.datatorrent.netlet.util.Slice; - -/** - * Generic serde using Kryo serialization. Note that while this is convenient, it may not be desirable because - * using Kryo makes the object being serialized rigid, meaning you won't be able to make backward compatible or - * incompatible changes to the class being serialized. - * - * @param <T> The type being serialized - */ [email protected] -public class SerdeKryoSlice<T> implements Serde<T, Slice> -{ - // Setup ThreadLocal of Kryo instances - private static final ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() - { - protected Kryo initialValue() - { - Kryo kryo = new Kryo(); - // configure kryo instance, customize settings - return kryo; - } - }; - - private final Class<? extends T> clazz; - - public SerdeKryoSlice() - { - this.clazz = null; - } - - public SerdeKryoSlice(Class<? extends T> clazz) - { - this.clazz = clazz; - } - - @Override - public Slice serialize(T object) - { - Kryo kryo = kryos.get(); - ByteArrayOutputStream stream = new ByteArrayOutputStream(); - Output output = new Output(stream); - if (clazz == null) { - kryo.writeClassAndObject(output, object); - } else { - kryo.writeObject(output, object); - } - return new Slice(output.toBytes()); - } - - @Override - public T deserialize(Slice slice, MutableInt offset) - { - byte[] bytes = slice.toByteArray(); - Kryo kryo = kryos.get(); - Input input = new Input(bytes, offset.intValue(), bytes.length - offset.intValue()); - T object; - if (clazz == null) { - object = (T)kryo.readClassAndObject(input); - } else { - object = kryo.readObject(input, clazz); - } - offset.setValue(bytes.length - input.position()); - return object; - } - - @Override - public T deserialize(Slice slice) - { - return deserialize(slice, new MutableInt(0)); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java deleted file mode 100644 index 6fe07d9..0000000 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.utils.serde; - -import org.apache.commons.lang3.mutable.MutableInt; -import org.apache.hadoop.classification.InterfaceStability; - -import com.datatorrent.lib.appdata.gpo.GPOUtils; -import com.datatorrent.netlet.util.Slice; - -/** - * This is an implementation of {@link Serde} which deserializes and serializes integers. - * - * @since 3.5.0 - */ [email protected] -public class SerdeLongSlice implements Serde<Long, Slice> -{ - @Override - public Slice serialize(Long object) - { - return new Slice(GPOUtils.serializeLong(object)); - } - - @Override - public Long deserialize(Slice slice, MutableInt offset) - { - long val = GPOUtils.deserializeLong(slice.buffer, new MutableInt(slice.offset + offset.intValue())); - offset.add(8); - return val; - } - - @Override - public Long deserialize(Slice object) - { - return deserialize(object, new MutableInt(0)); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java deleted file mode 100644 index 59cf282..0000000 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.utils.serde; - -import javax.validation.constraints.NotNull; - -import org.apache.commons.lang3.mutable.MutableInt; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.hadoop.classification.InterfaceStability; - -import com.google.common.base.Preconditions; - -import com.datatorrent.netlet.util.Slice; - -/** - * This is an implementation of {@link Serde} which serializes and deserializes pairs. - */ [email protected] -public class SerdePairSlice<T1, T2> implements Serde<Pair<T1, T2>, Slice> -{ - @NotNull - private Serde<T1, Slice> serde1; - @NotNull - private Serde<T2, Slice> serde2; - - private SerdePairSlice() - { - // for Kryo - } - - /** - * Creates a {@link SerdePairSlice}. - * @param serde1 The {@link Serde} that is used to serialize and deserialize first element of a pair - * @param serde2 The {@link Serde} that is used to serialize and deserialize second element of a pair - */ - public SerdePairSlice(@NotNull Serde<T1, Slice> serde1, @NotNull Serde<T2, Slice> serde2) - { - this.serde1 = Preconditions.checkNotNull(serde1); - this.serde2 = Preconditions.checkNotNull(serde2); - } - - @Override - public Slice serialize(Pair<T1, T2> pair) - { - int size = 0; - - Slice slice1 = serde1.serialize(pair.getLeft()); - size += slice1.length; - Slice slice2 = serde2.serialize(pair.getRight()); - size += slice2.length; - - byte[] bytes = new byte[size]; - System.arraycopy(slice1.buffer, slice1.offset, bytes, 0, slice1.length); - System.arraycopy(slice2.buffer, slice2.offset, bytes, slice1.length, slice2.length); - - return new Slice(bytes); - } - - @Override - public Pair<T1, T2> deserialize(Slice slice, MutableInt offset) - { - T1 first = serde1.deserialize(slice, offset); - T2 second = serde2.deserialize(slice, offset); - return new ImmutablePair<>(first, second); - } - - @Override - public Pair<T1, T2> deserialize(Slice slice) - { - return deserialize(slice, new MutableInt(0)); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java deleted file mode 100644 index aaf0d61..0000000 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.utils.serde; - -import org.apache.commons.lang3.mutable.MutableInt; -import org.apache.hadoop.classification.InterfaceStability; - -import com.datatorrent.lib.appdata.gpo.GPOUtils; -import com.datatorrent.netlet.util.Slice; - -/** - * An implementation of {@link Serde} which serializes and deserializes {@link String}s. - * - * @since 3.5.0 - */ [email protected] -public class SerdeStringSlice implements Serde<String, Slice> -{ - @Override - public Slice serialize(String object) - { - return new Slice(GPOUtils.serializeString(object)); - } - - @Override - public String deserialize(Slice object, MutableInt offset) - { - offset.add(object.offset); - String string = GPOUtils.deserializeString(object.buffer, offset); - offset.subtract(object.offset); - return string; - } - - @Override - public String deserialize(Slice object) - { - return deserialize(object, new MutableInt(0)); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerializationBuffer.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerializationBuffer.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerializationBuffer.java new file mode 100644 index 0000000..f33f1e0 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerializationBuffer.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import org.apache.apex.malhar.lib.state.spillable.WindowListener; + +import com.esotericsoftware.kryo.io.Output; + +import com.datatorrent.netlet.util.Slice; + +public class SerializationBuffer extends Output implements WindowCompleteListener, WindowListener +{ + /* + * Singleton read buffer for serialization + */ + public static final SerializationBuffer READ_BUFFER = new SerializationBuffer(new WindowedBlockStream()); + + private WindowedBlockStream windowedBlockStream; + + @SuppressWarnings("unused") + private SerializationBuffer() + { + this(new WindowedBlockStream()); + } + + public SerializationBuffer(WindowedBlockStream windowedBlockStream) + { + super(windowedBlockStream); + this.windowedBlockStream = windowedBlockStream; + } + + public long size() + { + return windowedBlockStream.size(); + } + + public long capacity() + { + return windowedBlockStream.capacity(); + } + + /** + * This method should be called only after the whole object has been written + * @return The slice which represents the object + */ + public Slice toSlice() + { + this.flush(); + return windowedBlockStream.toSlice(); + } + + /** + * reset the environment to reuse the resource. + */ + public void reset() + { + windowedBlockStream.reset(); + } + + + @Override + public void beginWindow(long windowId) + { + windowedBlockStream.beginWindow(windowId); + } + + @Override + public void endWindow() + { + windowedBlockStream.endWindow(); + } + + public void release() + { + reset(); + windowedBlockStream.reset(); + } + + public WindowedBlockStream createWindowedBlockStream() + { + return new WindowedBlockStream(); + } + + public WindowedBlockStream createWindowedBlockStream(int capacity) + { + return new WindowedBlockStream(capacity); + } + + public WindowedBlockStream getWindowedBlockStream() + { + return windowedBlockStream; + } + + public void setWindowableByteStream(WindowedBlockStream windowableByteStream) + { + this.windowedBlockStream = windowableByteStream; + } + + /** + * reset for all windows with window id less than or equal to the input windowId + * this interface doesn't call reset window for each windows. Several windows can be reset at the same time. + * @param windowId + */ + @Override + public void completeWindow(long windowId) + { + windowedBlockStream.completeWindow(windowId); + } + + public byte[] toByteArray() + { + return toSlice().toByteArray(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java index 2671d5e..b504581 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java @@ -100,4 +100,14 @@ public class SliceUtils return new Slice(bytes); } + + public static BufferSlice toBufferSlice(Slice slice) + { + if (slice instanceof BufferSlice) { + return (BufferSlice)slice; + } + + //The hashCode of Slice was not correct, so correct it + return new BufferSlice(slice); + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/StringSerde.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/StringSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/StringSerde.java new file mode 100644 index 0000000..cb45e2a --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/StringSerde.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +/** + * An implementation of {@link Serde} which serializes and deserializes {@link String}s. + * + * @since 3.5.0 + */ [email protected] +public class StringSerde implements Serde<String> +{ + @Override + public void serialize(String string, Output output) + { + output.writeString(string); + } + + @Override + public String deserialize(Input input) + { + return input.readString(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowCompleteListener.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowCompleteListener.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowCompleteListener.java new file mode 100644 index 0000000..d2d38a7 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowCompleteListener.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +public interface WindowCompleteListener +{ + /** + * Notification that all windows which window id less or equal input windowId are complete + * + * @param windowId + */ + void completeWindow(long windowId); +}
