style95 commented on a change in pull request #5031:
URL: https://github.com/apache/openwhisk/pull/5031#discussion_r569996413



##########
File path: ansible/roles/controller/tasks/deploy.yml
##########
@@ -272,6 +272,11 @@
 
       "CONFIG_whisk_controller_activation_pollingFromDb": "{{ 
controller_activation_pollingFromDb | default(true) | lower }}"
 
+      "CONFIG_whisk_etcd_hosts": "{{ etcd_connect_string }}"

Review comment:
       Since ShardingContainerPoolBalancer does not use etcd, should we 
selectively configure these configurations?

##########
File path: 
tests/src/test/scala/org/apache/openwhisk/common/etcd/EtcdKvTests.scala
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.common.etcd
+
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.entity.InvokerInstanceId
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.etcd.EtcdKV.InvokerKeys
+import org.junit.runner.RunWith
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+import pureconfig.loadConfigOrThrow
+
+@RunWith(classOf[JUnitRunner])
+class EtcdKvTests extends FlatSpec with ScalaFutures with Matchers {

Review comment:
       Should we need to selectively run these tests?

##########
File path: 
common/scala/src/main/scala/org/apache/openwhisk/core/etcd/EtcdClient.scala
##########
@@ -0,0 +1,262 @@
+package org.apache.openwhisk.core.etcd
+
+import com.google.common.util.concurrent.{FutureCallback, Futures, 
ListenableFuture}
+import com.ibm.etcd.api._
+import com.ibm.etcd.client.kv.KvClient.Watch
+import com.ibm.etcd.client.kv.{KvClient, WatchUpdate}
+import com.ibm.etcd.client.{EtcdClient => Client}
+import io.grpc.stub.StreamObserver
+import java.util.concurrent.Executors
+
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.etcd.EtcdType._
+import pureconfig.loadConfigOrThrow
+import spray.json.DefaultJsonProtocol
+
+import scala.language.implicitConversions
+import scala.annotation.tailrec
+import scala.concurrent.{ExecutionContextExecutor, Future, Promise}
+
+case class Lease(id: Long, ttl: Long)
+
+object RichListenableFuture {
+  implicit def convertToFuture[T](lf: ListenableFuture[T])(implicit ece: 
ExecutionContextExecutor): Future[T] = {
+    val p = Promise[T]()
+    Futures.addCallback(lf, new FutureCallback[T] {
+      def onFailure(t: Throwable): Unit = p failure t
+      def onSuccess(result: T): Unit = p success result
+    }, ece)
+    p.future
+  }
+}
+
+object EtcdClient {
+  // hostAndPorts format: {HOST}:{PORT}[,{HOST}:{PORT},{HOST}:{PORT}, ...]
+  def apply(hostAndPorts: String)(implicit ece: ExecutionContextExecutor): 
EtcdClient = {
+    require(hostAndPorts != null)
+    val client: Client = 
Client.forEndpoints(hostAndPorts).withPlainText().build()
+    new EtcdClient(client)(ece)
+  }
+
+  def apply(client: Client)(implicit ece: ExecutionContextExecutor): 
EtcdClient = {
+    new EtcdClient(client)(ece)
+  }
+}
+
+class EtcdClient(val client: Client)(override implicit val ece: 
ExecutionContextExecutor)
+    extends EtcdKeyValueApi
+    with EtcdLeaseApi
+    with EtcdWatchApi
+    with EtcdLeadershipApi {
+
+  def close() = {
+    client.close()
+  }
+}
+
+trait EtcdKeyValueApi extends KeyValueStore {
+  import RichListenableFuture._
+  protected[etcd] val client: Client
+
+  override def get(key: String): Future[RangeResponse] =
+    client.getKvClient.get(key).async()
+
+  override def getPrefix(prefixKey: String): Future[RangeResponse] = {
+    client.getKvClient.get(prefixKey).asPrefix().async()
+  }
+
+  override def getCount(prefixKey: String): Future[Long] = {
+    
client.getKvClient.get(prefixKey).asPrefix().countOnly().async().map(_.getCount)
+  }
+
+  override def put(key: String, value: String): Future[PutResponse] =
+    client.getKvClient.put(key, value).async().recoverWith {
+      case t =>
+        Future.failed[PutResponse](getNestedException(t))
+    }
+
+  override def put(key: String, value: String, leaseId: Long): 
Future[PutResponse] =
+    client.getKvClient
+      .put(key, value, leaseId)
+      .async()
+      .recoverWith {
+        case t =>
+          Future.failed[PutResponse](getNestedException(t))
+      }
+
+  def put(key: String, value: Boolean): Future[PutResponse] = {
+    put(key, value.toString)
+  }
+
+  def put(key: String, value: Boolean, leaseId: Long): Future[PutResponse] = {
+    put(key, value.toString, leaseId)
+  }
+
+  override def del(key: String): Future[DeleteRangeResponse] =
+    client.getKvClient.delete(key).async().recoverWith {
+      case t =>
+        Future.failed[DeleteRangeResponse](getNestedException(t))
+    }
+
+  override def putTxn[T](key: String, value: T, cmpVersion: Long, leaseId: 
Long): Future[TxnResponse] = {
+    client.getKvClient
+      .txnIf()
+      .cmpEqual(key)
+      .version(cmpVersion)
+      .`then`()
+      .put(client.getKvClient
+        .put(key, value.toString, leaseId)
+        .asRequest())
+      .async()
+      .recoverWith {
+        case t =>
+          Future.failed[TxnResponse](getNestedException(t))
+      }
+  }
+
+  @tailrec
+  private def getNestedException(t: Throwable): Throwable = {
+    if (t.getCause == null) t
+    else getNestedException(t.getCause)
+  }
+}
+
+trait KeyValueStore {

Review comment:
       While this trait can be used to have a new store, but most of the 
methods and their signatures conform to ETCD.
   I feel like it is highly unlikely that we can replace it with another 
key-value store as it should support all functionalities such as TTL with 
lease/keepalive, Transaction, prefix, etc.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to