http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/StructuredCoder.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/StructuredCoder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/StructuredCoder.java new file mode 100644 index 0000000..3e299a6 --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/StructuredCoder.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.coder; + +import java.io.ByteArrayOutputStream; +import java.util.Collections; +import java.util.List; + +public abstract class StructuredCoder<T> extends Coder<T> { + protected StructuredCoder() {} + + public List<? extends Coder<?>> getComponents() { + List<? extends Coder<?>> coderArguments = getCoderArguments(); + if (coderArguments == null) { + return Collections.emptyList(); + } else { + return coderArguments; + } + } + + @Override + public boolean equals(Object o) { + if (o == null || this.getClass() != o.getClass()) { + return false; + } + StructuredCoder<?> that = (StructuredCoder<?>) o; + return this.getComponents().equals(that.getComponents()); + } + + @Override + public int hashCode() { + return getClass().hashCode() * 31 + getComponents().hashCode(); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + String s = getClass().getName(); + builder.append(s.substring(s.lastIndexOf('.') + 1)); + + List<? extends Coder<?>> componentCoders = getComponents(); + if (!componentCoders.isEmpty()) { + builder.append('('); + boolean first = true; + for (Coder<?> componentCoder : componentCoders) { + if (first) { + first = false; + } else { + builder.append(','); + } + builder.append(componentCoder.toString()); + } + builder.append(')'); + } + return builder.toString(); + } + + @Override + public boolean consistentWithEquals() { + return false; + } + + @Override + public Object structuralValue(T value) { + if (value != null && consistentWithEquals()) { + return value; + } else { + try { + ByteArrayOutputStream os = new ByteArrayOutputStream(); + encode(value, os); + return new StructuralByteArray(os.toByteArray()); + } catch (Exception exn) { + throw new IllegalArgumentException( + "Unable to encode element '" + value + "' with coder '" + this + "'.", exn); + } + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VarInt.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VarInt.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VarInt.java new file mode 100644 index 0000000..bebc1e4 --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VarInt.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.coder; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +public class VarInt { + + private static long convertIntToLongNoSignExtend(int v) { + return v & 0xFFFFFFFFL; + } + + public static void encode(int v, OutputStream stream) throws IOException { + encode(convertIntToLongNoSignExtend(v), stream); + } + + public static void encode(long v, OutputStream stream) throws IOException { + do { + // Encode next 7 bits + terminator bit + long bits = v & 0x7F; + v >>>= 7; + byte b = (byte) (bits | ((v != 0) ? 0x80 : 0)); + stream.write(b); + } while (v != 0); + } + + public static int decodeInt(InputStream stream) throws IOException { + long r = decodeLong(stream); + if (r < 0 || r >= 1L << 32) { + throw new IOException("varint overflow " + r); + } + return (int) r; + } + + public static long decodeLong(InputStream stream) throws IOException { + long result = 0; + int shift = 0; + int b; + do { + // Get 7 bits from next byte + b = stream.read(); + if (b < 0) { + if (shift == 0) { + throw new EOFException(); + } else { + throw new IOException("varint not terminated"); + } + } + long bits = b & 0x7F; + if (shift >= 64 || (shift == 63 && bits > 1)) { + // Out of range + throw new IOException("varint too long"); + } + result |= bits << shift; + shift += 7; + } while ((b & 0x80) != 0); + return result; + } + + public static int getLength(int v) { + return getLength(convertIntToLongNoSignExtend(v)); + } + + public static int getLength(long v) { + int result = 0; + do { + result++; + v >>>= 7; + } while (v != 0); + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VarIntCoder.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VarIntCoder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VarIntCoder.java new file mode 100644 index 0000000..7dac822 --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VarIntCoder.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.coder; + +import java.io.*; + +public class VarIntCoder extends AtomicCoder<Integer> { + + public static VarIntCoder of() { + return INSTANCE; + } + + ///////////////////////////////////////////////////////////////////////////// + + private static final VarIntCoder INSTANCE = new VarIntCoder(); + + private VarIntCoder() {} + + @Override + public void encode(Integer value, OutputStream outStream) + throws CoderException { + if (value == null) { + throw new CoderException("cannot encode a null Integer"); + } + try { + VarInt.encode(value.intValue(), outStream); + } catch (IOException e) { + throw new CoderException(e); + } + } + + @Override + public Integer decode(InputStream inStream) + throws CoderException { + try { + return VarInt.decodeInt(inStream); + } catch (EOFException | UTFDataFormatException exn) { + // These exceptions correspond to decoding problems, so change + // what kind of exception they're branded as. + throw new CoderException(exn); + } catch (Exception e) { + throw new CoderException(e); + } + } + + @Override + public void verifyDeterministic() {} + + @Override + public boolean consistentWithEquals() { + return true; + } + + @Override + public boolean isRegisterByteSizeObserverCheap(Integer value) { + return true; + } + + @Override + public long getEncodedElementByteSize(Integer value) { + if (value == null) { + throw new CoderException("cannot encode a null Integer"); + } + return VarInt.getLength(value.longValue()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VarLongCoder.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VarLongCoder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VarLongCoder.java new file mode 100644 index 0000000..15af634 --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VarLongCoder.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.coder; + +import java.io.*; +import java.util.Collections; +import java.util.List; + +public class VarLongCoder extends StructuredCoder<Long> { + public static VarLongCoder of() { + return INSTANCE; + } + + ///////////////////////////////////////////////////////////////////////////// + + private static final VarLongCoder INSTANCE = new VarLongCoder(); + + private VarLongCoder() {} + + @Override + public void encode(Long value, OutputStream outStream) + throws CoderException { + if (value == null) { + throw new CoderException("cannot encode a null Long"); + } + try { + VarInt.encode(value.longValue(), outStream); + } catch (IOException e) { + throw new CoderException(e); + } + } + + @Override + public Long decode(InputStream inStream) + throws CoderException { + try { + return VarInt.decodeLong(inStream); + } catch (EOFException | UTFDataFormatException exn) { + // These exceptions correspond to decoding problems, so change + // what kind of exception they're branded as. + throw new CoderException(exn); + } catch (Exception e) { + throw new CoderException(e); + } + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return Collections.emptyList(); + } + + @Override + public void verifyDeterministic() {} + + @Override + public boolean consistentWithEquals() { + return true; + } + + @Override + public boolean isRegisterByteSizeObserverCheap(Long value) { + return true; + } + + @Override + public long getEncodedElementByteSize(Long value) { + if (value == null) { + throw new CoderException("cannot encode a null Long"); + } + return VarInt.getLength(value.longValue()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VoidCoder.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VoidCoder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VoidCoder.java new file mode 100644 index 0000000..f4d00f1 --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VoidCoder.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.coder; + +import java.io.InputStream; +import java.io.OutputStream; + +public class VoidCoder extends AtomicCoder<Void> { + + public static VoidCoder of() { + return INSTANCE; + } + + ///////////////////////////////////////////////////////////////////////////// + + private static final VoidCoder INSTANCE = new VoidCoder(); + + private VoidCoder() { + } + + @Override + public void encode(Void value, OutputStream outStream) { + // Nothing to write! + } + + @Override + public Void decode(InputStream inStream) { + // Nothing to read! + return null; + } + + @Override + public void verifyDeterministic() { + } + + @Override + public boolean consistentWithEquals() { + return true; + } + + @Override + public boolean isRegisterByteSizeObserverCheap(Void value) { + return true; + } + + @Override + protected long getEncodedElementByteSize(Void value) { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala index 17c93bd..e706f4f 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala @@ -20,10 +20,16 @@ package org.apache.gearpump.streaming.refactor.dsl.window.impl import org.apache.gearpump.Message import org.apache.gearpump.streaming.dsl.window.api.Trigger +<<<<<<< HEAD:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala trait ReduceFnRunner { def process(message: Message): Unit def onTrigger(trigger: Trigger): Unit +======= +trait State { + + def clear: Unit +>>>>>>> e6ce91c... [Gearpump 311] refactor state management:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/State.scala } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala index d0b84cb..6665766 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala @@ -16,6 +16,7 @@ * limitations under the License. */ +<<<<<<< HEAD:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala package org.apache.gearpump.streaming.refactor.sink import akka.actor.ActorSystem @@ -33,4 +34,20 @@ object DataSinkProcessor { Processor[DataSinkTask](parallelism, description = description, taskConf.withValue[DataSink](DataSinkTask.DATA_SINK, dataSink)) } +======= +package org.apache.gearpump.streaming.refactor.state + +import org.apache.gearpump.streaming.refactor.state.api.State + +trait StateTag[StateT <: State] extends Serializable { + + def appendTo(sb: Appendable) + + def getId: String + + def getSpec: StateSpec[StateT] + + def bind(binder: StateBinder): StateT + +>>>>>>> e6ce91c... [Gearpump 311] refactor state management:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateTag.scala } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/InMemoryGlobalStateInternals.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/InMemoryGlobalStateInternals.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/InMemoryGlobalStateInternals.scala new file mode 100644 index 0000000..4dbb07f --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/InMemoryGlobalStateInternals.scala @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state + +import java.util +import java.util.Map.Entry +import java.util.{ArrayList, HashSet, List, Set} +import java.lang.Iterable + +import com.google.common.collect.{HashBasedTable, Table} +import org.apache.gearpump.streaming.refactor.coder.Coder +import org.apache.gearpump.streaming.refactor.state.InMemoryGlobalStateInternals.InMemoryStateBinder +import org.apache.gearpump.streaming.refactor.state.api._ + +class InMemoryGlobalStateInternals[K] protected(key: K) extends StateInternals { + + protected val inMemoryStateTable: InMemoryGlobalStateInternals.StateTable = + new InMemoryGlobalStateInternals.StateTable { + override def binderForNamespace(namespace: StateNamespace): StateBinder = { + new InMemoryStateBinder + } + } + + override def getKey: Any = key + + override def state[T <: State](namespace: StateNamespace, address: StateTag[T]): T = + inMemoryStateTable.get(namespace, address) + +} + +object InMemoryGlobalStateInternals { + + abstract class StateTable { + + val stateTable: Table[StateNamespace, StateTag[_], State] = HashBasedTable.create() + + def get[StateT <: State](namespace: StateNamespace, tag: StateTag[StateT]): StateT = { + val storage: State = stateTable.get(namespace, tag) + if (storage != null) { + storage.asInstanceOf[StateT] + } + + val typedStorage: StateT = tag.getSpec.bind(tag.getId, binderForNamespace(namespace)) + stateTable.put(namespace, tag, typedStorage) + typedStorage + } + + def clearNamespace(namespace: StateNamespace): Unit = stateTable.rowKeySet().remove(namespace) + + def clear: Unit = stateTable.clear() + + def values: Iterable[State] = stateTable.values().asInstanceOf[Iterable[State]] + + def isNamespaceInUse(namespace: StateNamespace): Boolean = stateTable.containsRow(namespace) + + def getTagsInUse(namespace: StateNamespace): java.util.Map[StateTag[_], State] + = stateTable.row(namespace) + + def getNamespacesInUse(): java.util.Set[StateNamespace] = stateTable.rowKeySet() + + def binderForNamespace(namespace: StateNamespace): StateBinder + + } + + class InMemoryStateBinder extends StateBinder { + + override def bindValue[T](id: String, spec: StateSpec[ValueState[T]], + coder: Coder[T]): ValueState[T] = new InMemoryValueState[T]() + + override def bindBag[T](id: String, spec: StateSpec[BagState[T]], + elemCoder: Coder[T]): BagState[T] = new InMemoryBagState[T]() + + override def bindSet[T](id: String, spec: StateSpec[SetState[T]], + elemCoder: Coder[T]): SetState[T] = new InMemorySetState[T]() + + override def bindMap[KeyT, ValueT](id: String, spec: StateSpec[MapState[KeyT, ValueT]], + mapKeyCoder: Coder[KeyT], mapValueCoder: Coder[ValueT]): MapState[KeyT, ValueT] = + new InMemoryMapState[KeyT, ValueT]() + } + + trait InMemoryState[T <: InMemoryState[T]] { + + def isCleared: Boolean + + def copy: T + + } + + class InMemoryBagState[T] extends BagState[T] with InMemoryState[InMemoryBagState[T]] { + + private var contents: List[T] = new ArrayList[T] + + override def readLater: BagState[T] = this + + override def isCleared: Boolean = contents.isEmpty + + override def copy: InMemoryBagState[T] = { + val that: InMemoryBagState[T] = new InMemoryBagState[T] + that.contents.addAll(this.contents) + that + } + + override def add(value: T): Unit = contents.add(value) + + override def isEmpty: ReadableState[Boolean] = { + new ReadableState[Boolean] { + override def readLater: ReadableState[Boolean] = { + this + } + + override def read: Boolean = { + contents.isEmpty + } + } + } + + override def clear: Unit = contents = new ArrayList[T] + + override def read: Iterable[T] = contents.asInstanceOf[Iterable[T]] + + } + + class InMemoryValueState[T] extends ValueState[T] with InMemoryState[InMemoryValueState[T]] { + + private var cleared: Boolean = true + private var value: T = _ + + def write(input: T): Unit = { + cleared = false + this.value = input + } + + def readLater: InMemoryValueState[T] = this + + def isCleared: Boolean = cleared + + def copy: InMemoryValueState[T] = { + val that: InMemoryValueState[T] = new InMemoryValueState[T] + if (!this.cleared) { + that.cleared = this.cleared + that.value = this.value + } + + that + } + + def clear: Unit = { + value = null.asInstanceOf[T] + cleared = true + } + + def read: T = value + + } + + class InMemoryMapState[K, V] extends MapState[K, V] with InMemoryState[InMemoryMapState[K, V]] { + + private var contents: util.Map[K, V] = new util.HashMap[K, V] + + override def put(key: K, value: V): Unit = contents.put(key, value) + + override def putIfAbsent(key: K, value: V): ReadableState[V] = { + var v: V = contents.get(key) + if (v == null) { + v = contents.put(key, value) + } + + ReadableStates.immediate(v) + } + + override def remove(key: K): Unit = contents.remove(key) + + override def get(key: K): ReadableState[V] = ReadableStates.immediate(contents.get(key)) + + override def keys: ReadableState[Iterable[K]] = + ReadableStates.immediate(contents.keySet().asInstanceOf[Iterable[K]]) + + override def values: ReadableState[Iterable[V]] = + ReadableStates.immediate(contents.values().asInstanceOf[Iterable[V]]) + + override def entries: ReadableState[Iterable[Entry[K, V]]] = + ReadableStates.immediate(contents.entrySet().asInstanceOf[Iterable[util.Map.Entry[K, V]]]) + + override def isCleared: Boolean = contents.isEmpty + + override def copy: InMemoryMapState[K, V] = { + val that: InMemoryMapState[K, V] = new InMemoryMapState + that.contents.putAll(this.contents) + that + } + + override def clear: Unit = contents = new util.HashMap[K, V] + + } + + class InMemorySetState[T] extends SetState[T] with InMemoryState[InMemorySetState[T]] { + + private var contents: Set[T] = new HashSet[T] + + override def contains(t: T): ReadableState[Boolean] = + ReadableStates.immediate(contents.contains(t)) + + override def addIfAbsent(t: T): ReadableState[Boolean] = { + val alreadyContained: Boolean = contents.contains(t) + contents.add(t) + ReadableStates.immediate(!alreadyContained) + } + + override def remove(t: T): Unit = contents.remove(t) + + override def readLater: SetState[T] = this + + override def isCleared: Boolean = contents.isEmpty + + override def copy: InMemorySetState[T] = { + val that: InMemorySetState[T] = new InMemorySetState[T] + that.contents.addAll(this.contents) + that + } + + override def add(value: T): Unit = contents.add(value) + + override def isEmpty: ReadableState[Boolean] = { + new ReadableState[Boolean] { + + override def readLater: ReadableState[Boolean] = this + + override def read: Boolean = contents.isEmpty + } + } + + override def clear: Unit = contents = new HashSet[T] + + override def read: Iterable[T] = contents.asInstanceOf[Iterable[T]] + } + +} + +object ReadableStates { + + def immediate[T](value: T): ReadableState[T] = { + new ReadableState[T] { + override def readLater: ReadableState[T] = { + this + } + + override def read: T = { + value + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala index c387960..8832aee 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala @@ -23,10 +23,16 @@ import java.time.Instant import org.apache.gearpump.streaming.refactor.coder.Coder import org.apache.gearpump.streaming.refactor.state.api.StateInternals +<<<<<<< HEAD:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala /** * */ trait RuntimeContext { +======= +trait StateSpec[StateT <: State] extends Serializable { + + def bind(id: String, binder: StateBinder): StateT +>>>>>>> e6ce91c... [Gearpump 311] refactor state management:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpec.scala def getStateInternals[KT](keyCoder: Coder[KT], key: KT): StateInternals http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateBinder.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateBinder.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateBinder.scala new file mode 100644 index 0000000..db39142 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateBinder.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state + +import org.apache.gearpump.streaming.refactor.coder.Coder +import org.apache.gearpump.streaming.refactor.state.api.{BagState, MapState, SetState, ValueState} + +trait StateBinder { + + def bindValue[T](id: String, spec: StateSpec[ValueState[T]], coder: Coder[T]): ValueState[T] + + def bindBag[T](id: String, spec: StateSpec[BagState[T]], elemCoder: Coder[T]): BagState[T] + + def bindSet[T](id: String, spec: StateSpec[SetState[T]], elemCoder: Coder[T]): SetState[T] + + def bindMap[KeyT, ValueT](id: String, spec: StateSpec[MapState[KeyT, ValueT]], + mapKeyCoder: Coder[KeyT], mapValueCoder: Coder[ValueT]): MapState[KeyT, ValueT] + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateNamespace.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateNamespace.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateNamespace.scala new file mode 100644 index 0000000..dbc2320 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateNamespace.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state + +trait StateNamespace { + + def stringKey: String + + def appendTo(sb: Appendable): Unit + + def getCacheKey: Object + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateNamespaces.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateNamespaces.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateNamespaces.scala new file mode 100644 index 0000000..c2cba51 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateNamespaces.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state + +import java.util.Objects + +object StateNamespaces { + + def global: StateNamespace = { + new GlobalNameSpace + } + + private object NameSpace extends Enumeration { + type NameSpace = Value + val GLOBAL, WINDOW, WINDOW_AND_TRIGGER = Value + } + + class GlobalNameSpace extends StateNamespace { + + private val GLOBAL_STRING: String = "/" + + override def stringKey: String = { + GLOBAL_STRING + } + + override def appendTo(sb: Appendable): Unit = { + sb.append(GLOBAL_STRING) + } + + override def getCacheKey: AnyRef = { + GLOBAL_STRING + } + + override def equals(obj: Any): Boolean = { + obj == this || obj.isInstanceOf[GlobalNameSpace] + } + + override def hashCode(): Int = { + Objects.hash(NameSpace.GLOBAL) + } + } + + // TODO : implement WindowNamespace & WindowAndTriggerNamespace + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpecs.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpecs.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpecs.scala new file mode 100644 index 0000000..f056915 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpecs.scala @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state + +import java.util.Objects + +import org.apache.gearpump.streaming.refactor.coder.Coder +import org.apache.gearpump.streaming.refactor.state.api.{BagState, MapState, SetState, ValueState} + +object StateSpecs { + + private class ValueStateSpec[T](coder: Coder[T]) extends StateSpec[ValueState[T]] { + + var aCoder: Coder[T] = coder + + override def bind(id: String, binder: StateBinder): ValueState[T] = { + binder.bindValue(id, this, aCoder) + } + + override def offerCoders(coders: Array[Coder[ValueState[T]]]): Unit = { + if (this.aCoder == null) { + if (coders(0) != null) { + this.aCoder = coders(0).asInstanceOf[Coder[T]] + } + } + } + + override def finishSpecifying: Unit = { + if (aCoder == null) throw new IllegalStateException( + "Unable to infer a coder for ValueState and no Coder" + + " was specified. Please set a coder by either invoking" + + " StateSpecs.value(Coder<T> valueCoder) or by registering the coder in the" + + " Pipeline's CoderRegistry.") + } + + override def equals(obj: Any): Boolean = { + var result = false + if (obj == this) result = true + + if (!(obj.isInstanceOf[ValueStateSpec[T]])) result = false + + val that: ValueStateSpec[_] = obj.asInstanceOf[ValueStateSpec[_]] + result = Objects.equals(this.aCoder, that.aCoder) + result + } + + override def hashCode(): Int = { + Objects.hashCode(this.aCoder) + } + } + + private class BagStateSpec[T](coder: Coder[T]) extends StateSpec[BagState[T]] { + + private implicit var elemCoder = coder + + override def bind(id: String, binder: StateBinder): BagState[T] = + binder.bindBag(id, this, elemCoder) + + override def offerCoders(coders: Array[Coder[BagState[T]]]): Unit = { + if (this.elemCoder == null) { + if (coders(0) != null) { + this.elemCoder = coders(0).asInstanceOf[Coder[T]] + } + } + } + + override def finishSpecifying: Unit = { + if (elemCoder == null) { + throw new IllegalStateException("Unable to infer a coder for BagState and no Coder" + + " was specified. Please set a coder by either invoking" + + " StateSpecs.bag(Coder<T> elemCoder) or by registering the coder in the" + + " Pipeline's CoderRegistry."); + } + } + + override def equals(obj: Any): Boolean = { + var result = false + if (obj == this) result = true + + if (!obj.isInstanceOf[BagStateSpec[_]]) result = false + + val that = obj.asInstanceOf[BagStateSpec[_]] + result = Objects.equals(this.elemCoder, that.elemCoder) + result + } + + override def hashCode(): Int = Objects.hash(getClass, elemCoder) + } + + private class MapStateSpec[K, V](keyCoder: Coder[K], valueCoder: Coder[V]) + extends StateSpec[MapState[K, V]] { + + private implicit var kCoder = keyCoder + private implicit var vCoder = valueCoder + + override def bind(id: String, binder: StateBinder): MapState[K, V] = + binder.bindMap(id, this, keyCoder, valueCoder) + + override def offerCoders(coders: Array[Coder[MapState[K, V]]]): Unit = { + if (this.kCoder == null) { + if (coders(0) != null) { + this.kCoder = coders(0).asInstanceOf[Coder[K]] + } + } + + if (this.vCoder == null) { + if (coders(1) != null) { + this.vCoder = coders(1).asInstanceOf[Coder[V]] + } + } + } + + override def finishSpecifying: Unit = { + if (keyCoder == null || valueCoder == null) { + throw new IllegalStateException("Unable to infer a coder for MapState and no Coder" + + " was specified. Please set a coder by either invoking" + + " StateSpecs.map(Coder<K> keyCoder, Coder<V> valueCoder) or by registering the" + + " coder in the Pipeline's CoderRegistry."); + } + } + + override def hashCode(): Int = Objects.hash(getClass, kCoder, vCoder) + + override def equals(obj: Any): Boolean = { + var result = false + if (obj == this) result = true + + if (!obj.isInstanceOf[MapStateSpec[_, _]]) result = false + + implicit var that = obj.asInstanceOf[MapStateSpec[_, _]] + result = Objects.equals(this.kCoder, that.vCoder) && Objects.equals(this.vCoder, that.vCoder) + result + } + } + + private class SetStateSpec[T](coder: Coder[T]) extends StateSpec[SetState[T]] { + + private implicit var elemCoder = coder + + override def bind(id: String, binder: StateBinder): SetState[T] = + binder.bindSet(id, this, elemCoder) + + override def offerCoders(coders: Array[Coder[SetState[T]]]): Unit = { + if (this.elemCoder == null) { + if (coders(0) != null) { + this.elemCoder = coders(0).asInstanceOf[Coder[T]] + } + } + } + + override def finishSpecifying: Unit = { + if (elemCoder == null) { + throw new IllegalStateException("Unable to infer a coder for SetState and no Coder" + + " was specified. Please set a coder by either invoking" + + " StateSpecs.set(Coder<T> elemCoder) or by registering the coder in the" + + " Pipeline's CoderRegistry."); + } + } + + override def equals(obj: Any): Boolean = { + var result = false + if (obj == this) result = true + + if (!obj.isInstanceOf[SetStateSpec[_]]) result = false + + implicit var that = obj.asInstanceOf[SetStateSpec[_]] + result = Objects.equals(this.elemCoder, that.elemCoder) + result + } + + override def hashCode(): Int = Objects.hash(getClass, elemCoder) + } + + def value[T]: StateSpec[ValueState[T]] = new ValueStateSpec[T](null) + + def value[T](valueCoder: Coder[T]): StateSpec[ValueState[T]] = { + if (valueCoder == null) { + throw new NullPointerException("valueCoder should not be null. Consider value() instead") + } + + new ValueStateSpec[T](valueCoder) + } + + def bag[T]: StateSpec[BagState[T]] = new BagStateSpec[T](null) + + def bag[T](elemCoder: Coder[T]): StateSpec[BagState[T]] = new BagStateSpec[T](elemCoder) + + def set[T]: StateSpec[SetState[T]] = new SetStateSpec[T](null) + + def set[T](elemCoder: Coder[T]): StateSpec[SetState[T]] = new SetStateSpec[T](elemCoder) + + def map[K, V]: StateSpec[MapState[K, V]] = new MapStateSpec[K, V](null, null) + + def map[K, V](keyCoder: Coder[K], valueCoder: Coder[V]): StateSpec[MapState[K, V]] = + new MapStateSpec[K, V](keyCoder, valueCoder) + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateTags.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateTags.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateTags.scala new file mode 100644 index 0000000..cbd050a --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateTags.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state + +import java.util.Objects + +import org.apache.gearpump.streaming.refactor.coder.Coder +import org.apache.gearpump.streaming.refactor.state.StateTags.StateKind.StateKind +import org.apache.gearpump.streaming.refactor.state.api._ + +object StateTags { + + object StateKind extends Enumeration { + type StateKind = Value + val SYSTEM = Value("s") + val USER = Value("u") + } + + private trait SystemStateTag[StateT <: State] { + def asKind(kind: StateKind): StateTag[StateT] + } + + def tagForSpec[StateT <: State](id: String, spec: StateSpec[StateT]): StateTag[StateT] = + new SimpleStateTag[StateT](new StructureId(id), spec) + + def value[T](id: String, valueCoder: Coder[T]): StateTag[ValueState[T]] = + new SimpleStateTag[ValueState[T]](new StructureId(id), StateSpecs.value(valueCoder)) + + def bag[T](id: String, elemCoder: Coder[T]): StateTag[BagState[T]] = + new SimpleStateTag[BagState[T]](new StructureId(id), StateSpecs.bag(elemCoder)) + + def set[T](id: String, elemCoder: Coder[T]): StateTag[SetState[T]] = + new SimpleStateTag[SetState[T]](new StructureId(id), StateSpecs.set(elemCoder)) + + def map[K, V](id: String, keyCoder: Coder[K], valueCoder: Coder[V]): StateTag[MapState[K, V]] = + new SimpleStateTag[MapState[K, V]](new StructureId(id), StateSpecs.map(keyCoder, valueCoder)) + + private class SimpleStateTag[StateT <: State](id: StructureId, spec: StateSpec[StateT]) + extends StateTag[StateT] with SystemStateTag[StateT] { + + val aSpec: StateSpec[StateT] = spec + val aId: StructureId = id + + override def appendTo(sb: Appendable): Unit = aId.appendTo(sb) + + + override def getId: String = id.getRawId + + override def getSpec: StateSpec[StateT] = aSpec + + override def bind(binder: StateBinder): StateT = aSpec.bind(aId.getRawId, binder) + + override def asKind(kind: StateKind): StateTag[StateT] = + new SimpleStateTag[StateT](aId.asKind(kind), aSpec) + + override def hashCode(): Int = Objects.hash(getClass, getId, getSpec) + + override def equals(obj: Any): Boolean = { + if (!(obj.isInstanceOf[SimpleStateTag[_]])) false + + val otherTag: SimpleStateTag[_] = obj.asInstanceOf[SimpleStateTag[_]] + Objects.equals(getId, otherTag.getId) && Objects.equals(getSpec, otherTag.getSpec) + } + } + + private class StructureId(kind: StateKind, rawId: String) extends Serializable { + + private val k: StateKind = kind + private val r: String = rawId + + def this(rawId: String) { + this(StateKind.USER, rawId) + } + + def asKind(kind: StateKind): StructureId = new StructureId(kind, r) + + def appendTo(sb: Appendable): Unit = sb.append(k.toString).append(r) + + def getRawId: String = r + + override def hashCode(): Int = Objects.hash(k, r) + + override def equals(obj: Any): Boolean = { + if (obj == this) true + + if (!(obj.isInstanceOf[StructureId])) false + + val that : StructureId = obj.asInstanceOf[StructureId] + Objects.equals(k, that.k) && Objects.equals(r, that.r) + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala index 6d72e78..0f94052 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala @@ -34,6 +34,13 @@ import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory import org.apache.gearpump.util.LogUtil import org.apache.gearpump.{Message, TimeStamp} +<<<<<<< HEAD +======= +object StatefulTask { + val LOG = LogUtil.getLogger(getClass) +} + +>>>>>>> e6ce91c... [Gearpump 311] refactor state management abstract class StatefulTask(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { @@ -53,7 +60,11 @@ abstract class StatefulTask(taskContext: TaskContext, conf: UserConfig) // core state data var encodedKeyStateMap: Map[String, Table[String, String, Array[Byte]]] = null +<<<<<<< HEAD def open(runtimeContext: RuntimeContext): Unit = {} +======= + def open: Unit = {} +>>>>>>> e6ce91c... [Gearpump 311] refactor state management def invoke(message: Message): Unit http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/BagState.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/BagState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/BagState.scala new file mode 100644 index 0000000..38d918e --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/BagState.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state.api + +import java.lang.Iterable + +trait BagState[T] extends GroupingState[T, Iterable[T]] { + + def readLater: BagState[T] + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/CombiningState.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/CombiningState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/CombiningState.scala new file mode 100644 index 0000000..640cc9e --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/CombiningState.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state.api + +trait CombiningState[InputT, AccumT, OutputT] extends GroupingState[InputT, OutputT] { + + def getAccum: AccumT + + def addAccum(accumT: AccumT) + + def mergeAccumulators(accumulators: Iterable[AccumT]): AccumT + + def readLater: CombiningState[InputT, AccumT, OutputT] + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/GroupingState.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/GroupingState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/GroupingState.scala new file mode 100644 index 0000000..2f8939a --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/GroupingState.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state.api + +trait GroupingState[InputT, OutputT] extends ReadableState[OutputT] with State { + + def add(value: InputT): Unit + + def isEmpty: ReadableState[Boolean] + + def readLater: GroupingState[InputT, OutputT] + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/MapState.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/MapState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/MapState.scala new file mode 100644 index 0000000..25de704 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/MapState.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state.api + +import java.lang.Iterable + +trait MapState[K, V] extends State { + + def put(key : K, value : V): Unit + + def putIfAbsent(key : K, value : V): ReadableState[V] + + def remove(key : K): Unit + + def get(key : K): ReadableState[V] + + def keys: ReadableState[Iterable[K]] + + def values: ReadableState[Iterable[V]] + + def entries: ReadableState[Iterable[java.util.Map.Entry[K, V]]] + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/ReadableState.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/ReadableState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/ReadableState.scala new file mode 100644 index 0000000..f6f4d98 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/ReadableState.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state.api + +trait ReadableState[T] { + + def read: T + + def readLater: ReadableState[T] + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/SetState.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/SetState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/SetState.scala new file mode 100644 index 0000000..e1990b2 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/SetState.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state.api + +import java.lang.Iterable + +trait SetState[T] extends GroupingState[T, Iterable[T]]{ + + def contains(t: T): ReadableState[Boolean] + + def addIfAbsent(t: T): ReadableState[Boolean] + + def remove(t: T): Unit + + def readLater: SetState[T] + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/StateInternals.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/StateInternals.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/StateInternals.scala new file mode 100644 index 0000000..e3a136d --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/StateInternals.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state.api + +import org.apache.gearpump.streaming.refactor.state.{StateNamespace, StateTag} + +trait StateInternals { + + def getKey: Any + + def state[T <: State](namespace: StateNamespace, address: StateTag[T]): T + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/StateInternalsFactory.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/StateInternalsFactory.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/StateInternalsFactory.scala new file mode 100644 index 0000000..215528c --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/StateInternalsFactory.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state.api + +trait StateInternalsFactory[K] extends Serializable { + + def stateInternalsForKey(key: K): StateInternals + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/ValueState.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/ValueState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/ValueState.scala new file mode 100644 index 0000000..3555ec4 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/ValueState.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state.api + +trait ValueState[T] extends ReadableState[T] with State { + + def write(input : T): Unit + + def readLater: ValueState[T] + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternals.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternals.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternals.scala new file mode 100644 index 0000000..12b6e42 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternals.scala @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state.heap + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream, OutputStream} +import java.lang.Iterable +import java.util +import java.util.Map.Entry +import java.util._ +import java.util.Objects + +import com.google.common.collect.Table +import org.apache.gearpump.streaming.refactor.coder.{Coder, ListCoder, MapCoder, SetCoder} +import org.apache.gearpump.streaming.refactor.state.{StateBinder, StateNamespace, StateSpec, StateTag} +import org.apache.gearpump.streaming.refactor.state.api._ +import org.apache.gearpump.util.LogUtil + +class HeapStateInternals[K](key: K, stateTable: Table[String, String, Array[Byte]]) + extends StateInternals { + + val LOG = LogUtil.getLogger(getClass) + + private class HeapStateBinder(namespace: StateNamespace, address: StateTag[_]) + extends StateBinder { + + private val ns: StateNamespace = namespace + private val addr: StateTag[_] = address + + override def bindValue[T](id: String, spec: StateSpec[ValueState[T]], + coder: Coder[T]): ValueState[T] = + new HeapValueState[T](ns, addr.asInstanceOf[StateTag[ValueState[T]]], coder) + + override def bindBag[T](id: String, spec: StateSpec[BagState[T]], + elemCoder: Coder[T]): BagState[T] = + new HeapBagState[T](ns, addr.asInstanceOf[StateTag[BagState[T]]], elemCoder) + + override def bindSet[T](id: String, spec: StateSpec[SetState[T]], + elemCoder: Coder[T]): SetState[T] = + new HeapSetState[T](ns, addr.asInstanceOf[StateTag[SetState[T]]], elemCoder) + + override def bindMap[KeyT, ValueT](id: String, spec: StateSpec[MapState[KeyT, ValueT]], + mapKeyCoder: Coder[KeyT], mapValueCoder: Coder[ValueT]): MapState[KeyT, ValueT] = + new HeapMapState[KeyT, ValueT](ns, + addr.asInstanceOf[StateTag[MapState[KeyT, ValueT]]], mapKeyCoder, mapValueCoder) + + } + + override def getKey: Any = key + + override def state[T <: State](namespace: StateNamespace, address: StateTag[T]): T = + address.bind(new HeapStateBinder(namespace, address)) + + private class AbstractState[T](namespace: StateNamespace, address: StateTag[_ <: State], + coder: Coder[T]) { + + protected val ns: StateNamespace = namespace + protected val addr: StateTag[_ <: State] = address + protected val c: Coder[T] = coder + + protected def readValue: T = { + var value: T = null.asInstanceOf[T] + val buf: Array[Byte] = stateTable.get(ns.stringKey, addr.getId) + if (buf != null) { + val is: InputStream = new ByteArrayInputStream(buf) + try { + value = c.decode(is) + } catch { + case ex: Exception => throw new RuntimeException(ex) + } + } + + value + } + + def writeValue(input: T): Unit = { + val output: ByteArrayOutputStream = new ByteArrayOutputStream(); + try { + c.encode(input, output) + stateTable.put(ns.stringKey, addr.getId, output.toByteArray) + } catch { + case ex: Exception => throw new RuntimeException(ex) + } + } + + def clear: Unit = stateTable.remove(ns.stringKey, addr.getId) + + override def hashCode(): Int = Objects.hash(ns, addr) + + override def equals(obj: Any): Boolean = { + if (obj == this) true + + if (null == obj || getClass != obj.getClass) false + + val that: AbstractState[_] = obj.asInstanceOf[AbstractState[_]] + Objects.equals(ns, that.ns) && Objects.equals(addr, that.addr) + } + } + + private class HeapValueState[T](namespace: StateNamespace, + address: StateTag[ValueState[T]], coder: Coder[T]) + extends AbstractState[T](namespace, address, coder) with ValueState[T] { + + override def write(input: T): Unit = writeValue(input) + + override def readLater: ValueState[T] = this + + override def read: T = readValue + } + + private class HeapMapState[MapKT, MapVT](namespace: StateNamespace, + address: StateTag[MapState[MapKT, MapVT]], mapKCoder: Coder[MapKT], mapVCoder: Coder[MapVT]) + extends AbstractState[Map[MapKT, MapVT]]( + namespace, address, MapCoder.of(mapKCoder, mapVCoder)) + with MapState[MapKT, MapVT] { + + private def readMap: Map[MapKT, MapVT] = { + implicit var map = super.readValue + if (map == null || map.size() == 0) { + map = new util.HashMap[MapKT, MapVT] + } + + map + } + + override def put(key: MapKT, value: MapVT): Unit = { + implicit var map = readMap + map.put(key, value) + super.writeValue(map) + } + + override def putIfAbsent(key: MapKT, value: MapVT): ReadableState[MapVT] = { + implicit var map = readMap + implicit val previousVal = map.putIfAbsent(key, value) + super.writeValue(map) + new ReadableState[MapVT] { + + override def readLater: ReadableState[MapVT] = this + + override def read: MapVT = previousVal + } + } + + override def remove(key: MapKT): Unit = { + implicit var map = readMap + map.remove(key) + super.writeValue(map) + } + + override def get(key: MapKT): ReadableState[MapVT] = { + implicit var map = readMap + new ReadableState[MapVT] { + + override def read: MapVT = map.get(key) + + override def readLater: ReadableState[MapVT] = this + } + } + + override def keys: ReadableState[Iterable[MapKT]] = { + implicit val map = readMap + new ReadableState[Iterable[MapKT]] { + + override def readLater: ReadableState[Iterable[MapKT]] = this + + override def read: Iterable[MapKT] = map.keySet() + } + } + + override def values: ReadableState[Iterable[MapVT]] = { + implicit val map = readMap + new ReadableState[Iterable[MapVT]] { + + override def readLater: ReadableState[Iterable[MapVT]] = this + + override def read: Iterable[MapVT] = map.values() + } + } + + override def entries: ReadableState[Iterable[Entry[MapKT, MapVT]]] = { + implicit var map = readMap + new ReadableState[Iterable[Entry[MapKT, MapVT]]] { + + override def readLater: ReadableState[Iterable[Entry[MapKT, MapVT]]] = this + + override def read: Iterable[Entry[MapKT, MapVT]] = map.entrySet() + } + } + + override def clear: Unit = { + implicit var map = readMap + map.clear() + super.writeValue(map) + } +} + + private class HeapBagState[T](namespace: StateNamespace, + address: StateTag[BagState[T]], coder: Coder[T]) + extends AbstractState[List[T]](namespace, address, ListCoder.of(coder)) with BagState[T] { + + override def readLater: BagState[T] = this + + override def add(input: T): Unit = { + val value: List[T] = read + value.add(input) + writeValue(value) + } + + override def isEmpty: ReadableState[Boolean] = { + new ReadableState[Boolean] { + + override def readLater: ReadableState[Boolean] = this + + override def read: Boolean = stateTable.get(ns.stringKey, addr.getId) == null + } + } + + override def read: List[T] = { + var value: List[T] = super.readValue + if (value == null || value.size() == 0) { + value = new ArrayList[T] + } + + value + } + } + + private class HeapSetState[T](namespace: StateNamespace, + address: StateTag[SetState[T]], coder: Coder[T]) + extends AbstractState[Set[T]](namespace, address, SetCoder.of(coder)) with SetState[T] { + + override def contains(t: T): ReadableState[Boolean] = { + implicit val set = read + new ReadableState[Boolean] { + + override def readLater: ReadableState[Boolean] = this + + override def read: Boolean = set.contains(t) + } + } + + override def addIfAbsent(t: T): ReadableState[Boolean] = { + implicit val set = read + val success = set.add(t) + super.writeValue(set) + new ReadableState[Boolean] { + + override def readLater: ReadableState[Boolean] = this + + override def read: Boolean = success + } + } + + override def remove(t: T): Unit = { + implicit var set = read + set.remove(t) + writeValue(set) + } + + override def readLater: SetState[T] = this + + override def add(value: T): Unit = { + implicit var set = read + set.add(value) + writeValue(set) + } + + override def isEmpty: ReadableState[Boolean] = { + implicit val set = read + new ReadableState[Boolean] { + + override def readLater: ReadableState[Boolean] = this + + override def read: Boolean = set.isEmpty + } + } + + override def read: Set[T] = { + var value: Set[T] = super.readValue + if (value == null || value.size() == 0) { + value = new util.HashSet[T]() + } + + value + } + } + +} + + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternalsFactory.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternalsFactory.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternalsFactory.scala new file mode 100644 index 0000000..db20d66 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternalsFactory.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state.heap + +import org.apache.gearpump.streaming.refactor.coder.{Coder, CoderException, CoderUtils} +import org.apache.gearpump.streaming.refactor.state.api.{StateInternals, StateInternalsFactory} +import java.util._ + +import com.google.common.collect.{HashBasedTable, Table} +import org.apache.gearpump.util.LogUtil + +class HeapStateInternalsFactory[K](keyCoder: Coder[K], + map: Map[String, Table[String, String, Array[Byte]]]) + extends StateInternalsFactory[K] with Serializable { + + private val LOG = LogUtil.getLogger(getClass) + + private val kc: Coder[K] = keyCoder + private val perKeyState: Map[String, Table[String, String, Array[Byte]]] = map + + def getKeyCoder: Coder[K] = { + this.kc + } + + override def stateInternalsForKey(key: K): StateInternals = { + var keyBytes: Option[Array[Byte]] = None + if (key != null) { + keyBytes = Some(CoderUtils.encodeToByteArray(kc, key)) + } + + if (keyBytes.isEmpty) { + throw new RuntimeException("key bytes is null or empty, encode key occurs a error") + } + + val keyBased64Str = Base64.getEncoder.encodeToString(keyBytes.get) + var stateTable: Table[String, String, Array[Byte]] = perKeyState.get(keyBased64Str) + if (stateTable == null) { + LOG.info("stateTable is null, will create!") + stateTable = HashBasedTable.create() + perKeyState.put(keyBased64Str, stateTable) + } + + new HeapStateInternals[K](key, stateTable) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7068699d/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternalsProxy.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternalsProxy.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternalsProxy.scala new file mode 100644 index 0000000..2f85dd9 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternalsProxy.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state.heap + +import org.apache.gearpump.streaming.refactor.coder.Coder +import org.apache.gearpump.streaming.refactor.state.{StateNamespace, StateTag} +import org.apache.gearpump.streaming.refactor.state.api.{State, StateInternals, StateInternalsFactory} + +class HeapStateInternalsProxy[K](heapStateInternalsFactory: HeapStateInternalsFactory[K]) + extends StateInternals with Serializable { + + private val factory: HeapStateInternalsFactory[K] = heapStateInternalsFactory + + @transient + private var currentKey: K = _ + + def getFactory: StateInternalsFactory[K] = { + factory + } + + def getKeyCoder: Coder[K] = { + factory.getKeyCoder + } + + override def getKey: K = { + currentKey + } + + def setKey(key: K): Unit = { + currentKey = key + } + + override def state[T <: State](namespace: StateNamespace, address: StateTag[T]): T = { + factory.stateInternalsForKey(currentKey).state(namespace, address) + } +}
