[Gearpump 311] refactor state management
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/db8abf99 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/db8abf99 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/db8abf99 Branch: refs/heads/state Commit: db8abf99fe3dd7c8d00975cbc0832cfa7e3b240f Parents: 7068699 Author: vinoyang <[email protected]> Authored: Wed Jun 21 23:51:47 2017 +0800 Committer: vinoyang <[email protected]> Committed: Sun Jul 23 10:41:01 2017 +0800 ---------------------------------------------------------------------- .../streaming/refactor/coder/Coder.java | 2 -- .../dsl/window/impl/ReduceFnRunner.scala | 8 +---- .../refactor/sink/DataSinkProcessor.scala | 21 ++----------- .../refactor/state/RuntimeContext.scala | 9 ------ .../streaming/refactor/state/StateSpec.scala | 32 +++++++++++++++++++ .../streaming/refactor/state/StateTag.scala | 33 ++++++++++++++++++++ .../streaming/refactor/state/StatefulTask.scala | 12 ------- .../streaming/refactor/state/api/State.scala | 25 +++++++++++++++ 8 files changed, 93 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/db8abf99/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/Coder.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/Coder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/Coder.java index e1999ed..edbe9a1 100644 --- a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/Coder.java +++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/Coder.java @@ -19,7 +19,6 @@ package org.apache.gearpump.streaming.refactor.coder; import com.google.common.base.Joiner; -import com.google.common.base.Objects; import com.google.common.io.ByteStreams; import com.google.common.io.CountingOutputStream; @@ -130,5 +129,4 @@ public abstract class Coder<T> implements Serializable { coder, Joiner.on("%n ").join(reasons)); } } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/db8abf99/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala index e706f4f..3fb8034 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala @@ -20,16 +20,10 @@ package org.apache.gearpump.streaming.refactor.dsl.window.impl import org.apache.gearpump.Message import org.apache.gearpump.streaming.dsl.window.api.Trigger -<<<<<<< HEAD:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala trait ReduceFnRunner { def process(message: Message): Unit def onTrigger(trigger: Trigger): Unit -======= -trait State { - def clear: Unit ->>>>>>> e6ce91c... [Gearpump 311] refactor state management:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/State.scala - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/db8abf99/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala index 6665766..e79f271 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala @@ -16,7 +16,6 @@ * limitations under the License. */ -<<<<<<< HEAD:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala package org.apache.gearpump.streaming.refactor.sink import akka.actor.ActorSystem @@ -30,24 +29,8 @@ object DataSinkProcessor { parallelism: Int = 1, description: String = "", taskConf: UserConfig = UserConfig.empty)(implicit system: ActorSystem) - : Processor[DataSinkTask] = { + : Processor[DataSinkTask] = { Processor[DataSinkTask](parallelism, description = description, taskConf.withValue[DataSink](DataSinkTask.DATA_SINK, dataSink)) } -======= -package org.apache.gearpump.streaming.refactor.state - -import org.apache.gearpump.streaming.refactor.state.api.State - -trait StateTag[StateT <: State] extends Serializable { - - def appendTo(sb: Appendable) - - def getId: String - - def getSpec: StateSpec[StateT] - - def bind(binder: StateBinder): StateT - ->>>>>>> e6ce91c... [Gearpump 311] refactor state management:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateTag.scala -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/db8abf99/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala index 8832aee..f538400 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala @@ -23,16 +23,7 @@ import java.time.Instant import org.apache.gearpump.streaming.refactor.coder.Coder import org.apache.gearpump.streaming.refactor.state.api.StateInternals -<<<<<<< HEAD:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala -/** - * - */ trait RuntimeContext { -======= -trait StateSpec[StateT <: State] extends Serializable { - - def bind(id: String, binder: StateBinder): StateT ->>>>>>> e6ce91c... [Gearpump 311] refactor state management:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpec.scala def getStateInternals[KT](keyCoder: Coder[KT], key: KT): StateInternals http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/db8abf99/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpec.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpec.scala new file mode 100644 index 0000000..91cdbe5 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpec.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state + +import org.apache.gearpump.streaming.refactor.coder.Coder +import org.apache.gearpump.streaming.refactor.state.api.State + +trait StateSpec[StateT <: State] extends Serializable { + + def bind(id: String, binder: StateBinder): StateT + + def offerCoders(coders: Array[Coder[StateT]]): Unit + + def finishSpecifying: Unit + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/db8abf99/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateTag.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateTag.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateTag.scala new file mode 100644 index 0000000..9fa865d --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateTag.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state + +import org.apache.gearpump.streaming.refactor.state.api.State + +trait StateTag[StateT <: State] extends Serializable { + + def appendTo(sb: Appendable) + + def getId: String + + def getSpec: StateSpec[StateT] + + def bind(binder: StateBinder): StateT + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/db8abf99/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala index 0f94052..531ff66 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala @@ -31,16 +31,8 @@ import org.apache.gearpump.streaming.refactor.state.heap.HeapStateInternalsFacto import org.apache.gearpump.streaming.state.impl.{CheckpointManager, PersistentStateConfig} import org.apache.gearpump.streaming.task.{Task, TaskContext, UpdateCheckpointClock} import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory -import org.apache.gearpump.util.LogUtil import org.apache.gearpump.{Message, TimeStamp} -<<<<<<< HEAD -======= -object StatefulTask { - val LOG = LogUtil.getLogger(getClass) -} - ->>>>>>> e6ce91c... [Gearpump 311] refactor state management abstract class StatefulTask(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { @@ -60,11 +52,7 @@ abstract class StatefulTask(taskContext: TaskContext, conf: UserConfig) // core state data var encodedKeyStateMap: Map[String, Table[String, String, Array[Byte]]] = null -<<<<<<< HEAD def open(runtimeContext: RuntimeContext): Unit = {} -======= - def open: Unit = {} ->>>>>>> e6ce91c... [Gearpump 311] refactor state management def invoke(message: Message): Unit http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/db8abf99/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/State.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/State.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/State.scala new file mode 100644 index 0000000..5c01977 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/State.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state.api + +trait State { + + def clear: Unit + +}
