[Gearpump 311] refactor state management

Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/db8abf99
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/db8abf99
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/db8abf99

Branch: refs/heads/state
Commit: db8abf99fe3dd7c8d00975cbc0832cfa7e3b240f
Parents: 7068699
Author: vinoyang <[email protected]>
Authored: Wed Jun 21 23:51:47 2017 +0800
Committer: vinoyang <[email protected]>
Committed: Sun Jul 23 10:41:01 2017 +0800

----------------------------------------------------------------------
 .../streaming/refactor/coder/Coder.java         |  2 --
 .../dsl/window/impl/ReduceFnRunner.scala        |  8 +----
 .../refactor/sink/DataSinkProcessor.scala       | 21 ++-----------
 .../refactor/state/RuntimeContext.scala         |  9 ------
 .../streaming/refactor/state/StateSpec.scala    | 32 +++++++++++++++++++
 .../streaming/refactor/state/StateTag.scala     | 33 ++++++++++++++++++++
 .../streaming/refactor/state/StatefulTask.scala | 12 -------
 .../streaming/refactor/state/api/State.scala    | 25 +++++++++++++++
 8 files changed, 93 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/db8abf99/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/Coder.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/Coder.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/Coder.java
index e1999ed..edbe9a1 100644
--- 
a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/Coder.java
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/Coder.java
@@ -19,7 +19,6 @@
 package org.apache.gearpump.streaming.refactor.coder;
 
 import com.google.common.base.Joiner;
-import com.google.common.base.Objects;
 import com.google.common.io.ByteStreams;
 import com.google.common.io.CountingOutputStream;
 
@@ -130,5 +129,4 @@ public abstract class Coder<T> implements Serializable {
                     coder, Joiner.on("%n  ").join(reasons));
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/db8abf99/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala
index e706f4f..3fb8034 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala
@@ -20,16 +20,10 @@ package 
org.apache.gearpump.streaming.refactor.dsl.window.impl
 import org.apache.gearpump.Message
 import org.apache.gearpump.streaming.dsl.window.api.Trigger
 
-<<<<<<< 
HEAD:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala
 trait ReduceFnRunner {
 
   def process(message: Message): Unit
 
   def onTrigger(trigger: Trigger): Unit
-=======
-trait State {
 
-  def clear: Unit
->>>>>>> e6ce91c... [Gearpump 311] refactor state 
management:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/State.scala
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/db8abf99/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala
index 6665766..e79f271 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-<<<<<<< 
HEAD:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala
 package org.apache.gearpump.streaming.refactor.sink
 
 import akka.actor.ActorSystem
@@ -30,24 +29,8 @@ object DataSinkProcessor {
       parallelism: Int = 1,
       description: String = "",
       taskConf: UserConfig = UserConfig.empty)(implicit system: ActorSystem)
-    : Processor[DataSinkTask] = {
+  : Processor[DataSinkTask] = {
     Processor[DataSinkTask](parallelism, description = description,
       taskConf.withValue[DataSink](DataSinkTask.DATA_SINK, dataSink))
   }
-=======
-package org.apache.gearpump.streaming.refactor.state
-
-import org.apache.gearpump.streaming.refactor.state.api.State
-
-trait StateTag[StateT <: State] extends Serializable {
-
-  def appendTo(sb: Appendable)
-
-  def getId: String
-
-  def getSpec: StateSpec[StateT]
-
-  def bind(binder: StateBinder): StateT
-
->>>>>>> e6ce91c... [Gearpump 311] refactor state 
management:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateTag.scala
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/db8abf99/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala
index 8832aee..f538400 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala
@@ -23,16 +23,7 @@ import java.time.Instant
 import org.apache.gearpump.streaming.refactor.coder.Coder
 import org.apache.gearpump.streaming.refactor.state.api.StateInternals
 
-<<<<<<< 
HEAD:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala
-/**
- *
- */
 trait RuntimeContext {
-=======
-trait StateSpec[StateT <: State] extends Serializable {
-
-  def bind(id: String, binder: StateBinder): StateT
->>>>>>> e6ce91c... [Gearpump 311] refactor state 
management:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpec.scala
 
   def getStateInternals[KT](keyCoder: Coder[KT], key: KT): StateInternals
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/db8abf99/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpec.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpec.scala
new file mode 100644
index 0000000..91cdbe5
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpec.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import org.apache.gearpump.streaming.refactor.coder.Coder
+import org.apache.gearpump.streaming.refactor.state.api.State
+
+trait StateSpec[StateT <: State] extends Serializable {
+
+  def bind(id: String, binder: StateBinder): StateT
+
+  def offerCoders(coders: Array[Coder[StateT]]): Unit
+
+  def finishSpecifying: Unit
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/db8abf99/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateTag.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateTag.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateTag.scala
new file mode 100644
index 0000000..9fa865d
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateTag.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import org.apache.gearpump.streaming.refactor.state.api.State
+
+trait StateTag[StateT <: State] extends Serializable {
+
+  def appendTo(sb: Appendable)
+
+  def getId: String
+
+  def getSpec: StateSpec[StateT]
+
+  def bind(binder: StateBinder): StateT
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/db8abf99/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala
index 0f94052..531ff66 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala
@@ -31,16 +31,8 @@ import 
org.apache.gearpump.streaming.refactor.state.heap.HeapStateInternalsFacto
 import org.apache.gearpump.streaming.state.impl.{CheckpointManager, 
PersistentStateConfig}
 import org.apache.gearpump.streaming.task.{Task, TaskContext, 
UpdateCheckpointClock}
 import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
-import org.apache.gearpump.util.LogUtil
 import org.apache.gearpump.{Message, TimeStamp}
 
-<<<<<<< HEAD
-=======
-object StatefulTask {
-  val LOG = LogUtil.getLogger(getClass)
-}
-
->>>>>>> e6ce91c... [Gearpump 311] refactor state management
 abstract class StatefulTask(taskContext: TaskContext, conf: UserConfig)
   extends Task(taskContext, conf) {
 
@@ -60,11 +52,7 @@ abstract class StatefulTask(taskContext: TaskContext, conf: 
UserConfig)
   // core state data
   var encodedKeyStateMap: Map[String, Table[String, String, Array[Byte]]] = 
null
 
-<<<<<<< HEAD
   def open(runtimeContext: RuntimeContext): Unit = {}
-=======
-  def open: Unit = {}
->>>>>>> e6ce91c... [Gearpump 311] refactor state management
 
   def invoke(message: Message): Unit
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/db8abf99/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/State.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/State.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/State.scala
new file mode 100644
index 0000000..5c01977
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/State.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state.api
+
+trait State {
+
+  def clear: Unit
+
+}

Reply via email to