[ 
https://issues.apache.org/jira/browse/GEODE-10012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17500310#comment-17500310
 ] 

ASF GitHub Bot commented on GEODE-10012:
----------------------------------------

pivotal-jbarrett commented on a change in pull request #920:
URL: https://github.com/apache/geode-native/pull/920#discussion_r817929905



##########
File path: cppcache/src/ExecutionImpl.hpp
##########
@@ -78,6 +78,20 @@ class ExecutionImpl {
   static void addResults(std::shared_ptr<ResultCollector>& collector,
                          const std::shared_ptr<CacheableVector>& results);
 
+ protected:
+  std::shared_ptr<CacheableVector> executeOnPool(
+      const std::string& func, FunctionAttributes funcAttrs, int32_t 
retryAttempts,
+      std::chrono::milliseconds timeout = DEFAULT_QUERY_RESPONSE_TIMEOUT);
+
+  void executeOnAllServers(
+      const std::string& func, FunctionAttributes funcAttrs,
+      std::chrono::milliseconds timeout = DEFAULT_QUERY_RESPONSE_TIMEOUT);
+
+  FunctionAttributes getFunctionAttributes(
+      const std::string& func);

Review comment:
       Please use full names, like `functionId` here.

##########
File path: cppcache/src/ExecutionImpl.hpp
##########
@@ -78,6 +78,20 @@ class ExecutionImpl {
   static void addResults(std::shared_ptr<ResultCollector>& collector,
                          const std::shared_ptr<CacheableVector>& results);
 
+ protected:
+  std::shared_ptr<CacheableVector> executeOnPool(
+      const std::string& func, FunctionAttributes funcAttrs, int32_t 
retryAttempts,
+      std::chrono::milliseconds timeout = DEFAULT_QUERY_RESPONSE_TIMEOUT);
+
+  void executeOnAllServers(
+      const std::string& func, FunctionAttributes funcAttrs,
+      std::chrono::milliseconds timeout = DEFAULT_QUERY_RESPONSE_TIMEOUT);
+
+  FunctionAttributes getFunctionAttributes(
+      const std::string& func);
+  FunctionAttributes updateFunctionAttributes(const std::string &funcName);
+  GfErrType getFuncAttributes(const std::string& func, FunctionAttributes& 
attr);

Review comment:
       `functionId` and `functionAttributes` please.

##########
File path: cppcache/src/ExecutionImpl.cpp
##########
@@ -329,26 +281,23 @@ std::shared_ptr<ResultCollector> ExecutionImpl::execute(
           "Execution::execute: Transaction function execution on pool is not "
           "supported");
     }
-    if (m_allServer == false) {
-      executeOnPool(
-          func, isHAHasResultOptimizeForWrite,
-          (isHAHasResultOptimizeForWrite & 1) ? m_pool->getRetryAttempts() : 0,
-          timeout);
-      if (serverHasResult == true) {
-        // ExecutionImpl::addResults(m_rc, rs);
+    if (!m_allServer) {
+      executeOnPool(func, attrs, attrs.isHA() ? m_pool->getRetryAttempts() : 0,
+                    timeout);
+      if (attrs.hasResult()) {
         m_rc->endResults();
       }
       return m_rc;
     }
-    executeOnAllServers(func, isHAHasResultOptimizeForWrite, timeout);
+    executeOnAllServers(func, attrs, timeout);
   } else {
     throw IllegalStateException("Execution::execute: should not be here");
   }
   return m_rc;
 }
 
-GfErrType ExecutionImpl::getFuncAttributes(
-    const std::string& func, std::shared_ptr<std::vector<int8_t>>* attr) {
+GfErrType ExecutionImpl::getFuncAttributes(const std::string& func,
+                                           FunctionAttributes& attr) {

Review comment:
       Does it make more sense to have the function return `FunctionAttributes` 
and `throw` exceptions?

##########
File path: cppcache/src/ExecutionImpl.cpp
##########
@@ -406,24 +355,25 @@ void ExecutionImpl::addResults(
 }
 
 void ExecutionImpl::executeOnAllServers(const std::string& func,
-                                        uint8_t getResult,
+                                        FunctionAttributes funcAttrs,

Review comment:
       Is the copy construction cheaper or necessary here rather than a `const 
&`?

##########
File path: cppcache/src/ThinClientRegion.cpp
##########
@@ -2907,18 +2911,20 @@ void ThinClientRegion::executeFunction(
         rc->clearResults();
         failedNodes->clear();
       } else if (err == GF_TIMEOUT) {
-        LOGINFO("function timeout. Name: %s, timeout: %s, params: %" PRIu8
-                ", "
-                "retryAttempts: %d ",
-                func.c_str(), to_string(timeout).c_str(), getResult,
-                retryAttempts);
+        LOGINFO(
+            "function timeout. Name: %s, timeout: %s, FunctionState: %" PRIu8
+            ", "
+            "retryAttempts: %d ",

Review comment:
       Cleanup the formatting here.

##########
File path: cppcache/src/FunctionExecution.cpp
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FunctionExecution.hpp"
+
+#include <geode/ResultCollector.hpp>
+
+#include "CacheImpl.hpp"
+#include "TcrConnectionManager.hpp"
+#include "TcrMessage.hpp"
+#include "ThinClientPoolDM.hpp"
+#include "ThinClientRegion.hpp"
+
+namespace apache {
+namespace geode {
+namespace client {
+
+void FunctionExecution::setParameters(
+    const std::string& funcName, FunctionAttributes funcAttrs,
+    std::chrono::milliseconds timeout, std::shared_ptr<Cacheable> args,
+    TcrEndpoint* ep, ThinClientPoolDM* poolDM,
+    std::shared_ptr<std::recursive_mutex> resultCollectorMutex,
+    std::shared_ptr<ResultCollector> resultCollector,
+    std::shared_ptr<UserAttributes> userAttr) {
+  exceptionMsg_.clear();
+  resultCollectorMutex_ = std::move(resultCollectorMutex);
+  resultCollector_ = resultCollector;
+  error_ = GF_NOTCON;
+  funcName_ = funcName;

Review comment:
       Again, full names here.

##########
File path: cppcache/src/ExecutionImpl.hpp
##########
@@ -100,29 +114,18 @@ class ExecutionImpl {
         m_allServer(allServer),
         m_pool(pool),
         m_authenticatedView(authenticatedView) {}
+ protected:
+
   std::shared_ptr<CacheableVector> m_routingObj;
   std::shared_ptr<Cacheable> m_args;
   std::shared_ptr<ResultCollector> m_rc;
   std::shared_ptr<Region> m_region;
   bool m_allServer;
   std::shared_ptr<Pool> m_pool;
   AuthenticatedView* m_authenticatedView;
-  static std::recursive_mutex m_func_attrs_lock;
-  static FunctionToFunctionAttributes m_func_attrs;
-  //  std::vector<int8_t> m_attributes;
-
-  std::shared_ptr<CacheableVector> executeOnPool(
-      const std::string& func, uint8_t getResult, int32_t retryAttempts,
-      std::chrono::milliseconds timeout = DEFAULT_QUERY_RESPONSE_TIMEOUT);
 
-  void executeOnAllServers(
-      const std::string& func, uint8_t getResult,
-      std::chrono::milliseconds timeout = DEFAULT_QUERY_RESPONSE_TIMEOUT);
-
-  std::shared_ptr<std::vector<int8_t>> getFunctionAttributes(
-      const std::string& func);
-  GfErrType getFuncAttributes(const std::string& func,
-                              std::shared_ptr<std::vector<int8_t>>* attr);
+  std::map<std::string, FunctionAttributes> funcAttrs_;

Review comment:
       Yes! Thanks for removing another global!
   Please use full names, not abbreviated names.
   `functionAttributes_`
   `functionAttributesMutex_`

##########
File path: cppcache/src/FunctionAttributes.hpp
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#ifndef GEODE_FUNCTIONATTRIBUTES_H_
+#define GEODE_FUNCTIONATTRIBUTES_H_
+
+#include <cstdint>
+
+namespace apache {
+namespace geode {
+namespace client {
+
+class FunctionAttributes {
+ public:
+  enum : uint8_t {

Review comment:
       Please use `enum class`.

##########
File path: cppcache/integration/test/FunctionExecutionTest.cpp
##########
@@ -276,31 +318,164 @@ TEST(FunctionExecutionTest, 
FunctionExecutionWithIncompleteBucketLocations) {
   cache.close();
 }
 
-std::shared_ptr<CacheableVector> populateRegionReturnFilter(
-    const std::shared_ptr<Region> &region, const int numberOfPuts) {
-  auto routingObj = CacheableVector::create();
-  for (int i = 0; i < numberOfPuts; i++) {
-    region->put("KEY--" + std::to_string(i), "VALUE--" + std::to_string(i));
-    routingObj->push_back(CacheableKey::create("KEY--" + std::to_string(i)));
-  }
-  return routingObj;
+TEST(FunctionExecutionTest, shNonHAFunctionExecServerTimeout) {
+  Cluster cluster{LocatorCount{1}, ServerCount{3}};
+
+  cluster.start([&]() {
+    cluster.getGfsh()
+        .deploy()
+        .jar(getFrameworkString(FrameworkVariable::JavaObjectJarPath))
+        .execute();
+  });
+
+  cluster.getGfsh()
+      .create()
+      .region()
+      .withName("partition_region")
+      .withType("PARTITION")
+      .withRedundantCopies("1")
+      .execute();
+
+  auto cache = CacheFactory().set("log-level", "none").create();
+  auto poolFactory = cache.getPoolManager().createFactory();
+
+  cluster.applyLocators(poolFactory);
+
+  auto pool =
+      poolFactory.setPRSingleHopEnabled(true).setRetryAttempts(2).create(
+          "pool");
+
+  auto region = cache.createRegionFactory(RegionShortcut::PROXY)
+                    .setPoolName("pool")
+                    .create("partition_region");
+
+  // Populate region in a way that not all buckets are created.
+  // Servers in this case will create 88 of possible 113 buckets.
+  populateRegionAllBuckets(region);
+
+  // Check that PR metadata is updated. This is done to be sure
+  // that client will execute function in a non single hop manner
+  // because metadata doesn't contain all bucket locations.
+  // After metadata is refreshed, it will contain at least one
+  // bucket location.
+  CacheImpl *cacheImpl = CacheRegionHelper::getCacheImpl(&cache);
+  waitUntilPRMetadataIsRefreshed(cacheImpl);
+
+  auto functionService = FunctionService::onRegion(region);
+  auto resultCollector = std::make_shared<TestResultCollector>();
+
+  resultCollector->lock();
+
+  auto runner =
+      std::async(std::launch::async, [&functionService, resultCollector]() {
+        functionService.withCollector(resultCollector)
+            .execute("MultiGetAllFunctionTimeoutNonHA");
+      });
+
+  std::this_thread::sleep_for(std::chrono::seconds{25});

Review comment:
       We have tests with examples of using different synchronization 
primitives in place of sleeping.

##########
File path: cppcache/integration/test/FunctionExecutionTest.cpp
##########
@@ -74,6 +99,9 @@ class TestResultCollector : public ResultCollector {
   }
 
   virtual void addResult(const std::shared_ptr<Cacheable> &result) override {
+    LOGINFO("Before mutex lock!");
+    std::lock_guard<decltype(mutex_)> lock{mutex_};
+    LOGINFO("Adding a new result!");

Review comment:
       Tests shouldn't be logging period. If the behavior is worth logging then 
it should just be asserted. 

##########
File path: cppcache/src/FunctionExecution.cpp
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FunctionExecution.hpp"
+
+#include <geode/ResultCollector.hpp>
+
+#include "CacheImpl.hpp"
+#include "TcrConnectionManager.hpp"
+#include "TcrMessage.hpp"
+#include "ThinClientPoolDM.hpp"
+#include "ThinClientRegion.hpp"
+
+namespace apache {
+namespace geode {
+namespace client {
+
+void FunctionExecution::setParameters(
+    const std::string& funcName, FunctionAttributes funcAttrs,
+    std::chrono::milliseconds timeout, std::shared_ptr<Cacheable> args,
+    TcrEndpoint* ep, ThinClientPoolDM* poolDM,
+    std::shared_ptr<std::recursive_mutex> resultCollectorMutex,
+    std::shared_ptr<ResultCollector> resultCollector,
+    std::shared_ptr<UserAttributes> userAttr) {
+  exceptionMsg_.clear();
+  resultCollectorMutex_ = std::move(resultCollectorMutex);
+  resultCollector_ = resultCollector;
+  error_ = GF_NOTCON;
+  funcName_ = funcName;
+  funcAttrs_ = funcAttrs;
+  timeout_ = timeout;
+  args_ = args;
+  endpoint_ = ep;
+  pool_ = poolDM;
+  userAttrs_ = userAttr;
+}
+
+GfErrType FunctionExecution::execute() {
+  GuardUserAttributes gua;
+
+  if (userAttrs_) {
+    gua.setAuthenticatedView(userAttrs_->getAuthenticatedView());
+  }
+
+  TcrMessageExecuteFunction request(
+      new DataOutput(
+          pool_->getConnectionManager().getCacheImpl()->createDataOutput()),
+      funcName_, args_, funcAttrs_, pool_, timeout_);
+  TcrMessageReply reply(true, pool_);
+
+  auto resultProcessor = std::unique_ptr<ChunkedFunctionExecutionResponse>(
+      new ChunkedFunctionExecutionResponse(reply, funcAttrs_.hasResult(),
+                                           resultCollector_,
+                                           resultCollectorMutex_));
+
+  reply.setChunkedResultHandler(resultProcessor.get());
+  reply.setTimeout(timeout_);
+  reply.setDM(pool_);
+
+  LOGDEBUG(
+      "ThinClientPoolDM::sendRequestToAllServer sendRequest on endpoint[%s]!",
+      endpoint_->name().c_str());
+
+  error_ = pool_->sendRequestToEP(request, reply, endpoint_);
+  error_ = pool_->handleEPError(endpoint_, reply, error_);
+  if (error_ != GF_NOERR) {
+    if (error_ == GF_NOTCON) {
+      return GF_NOERR;  // if server is unavailable its not an error for
+      // functionexec OnServers() case
+    }
+    LOGDEBUG("FunctionExecution::execute failed on endpoint[%s]!. Error = %d ",
+             endpoint_->name().c_str(), error_);
+    if (reply.getMessageType() == TcrMessage::EXCEPTION) {
+      exceptionMsg_ = reply.getException();
+    }
+
+    return error_;
+  } else if (reply.getMessageType() == TcrMessage::EXCEPTION ||
+             reply.getMessageType() == TcrMessage::EXECUTE_FUNCTION_ERROR) {
+    error_ = ThinClientRegion::handleServerException("Execute",
+                                                     reply.getException());
+    exceptionMsg_ = reply.getException();
+  }
+
+  return error_;
+}
+
+OnRegionFunctionExecution::OnRegionFunctionExecution(
+    std::string funcName, const Region* region, std::shared_ptr<Cacheable> 
args,
+    std::shared_ptr<CacheableHashSet> routingObj, FunctionAttributes funcAttrs,
+    std::chrono::milliseconds timeout, ThinClientPoolDM* poolDM,
+    const std::shared_ptr<std::recursive_mutex>& collectorMutex,
+    std::shared_ptr<ResultCollector> collector,
+    std::shared_ptr<UserAttributes> userAttrs, bool isBGThread,
+    const std::shared_ptr<BucketServerLocation>& serverLocation,
+    bool allBuckets)
+    : serverLocation_{serverLocation},
+      backgroundThread_{isBGThread},
+      pool_{poolDM},
+      funcName_{funcName},

Review comment:
       `std::move` for probably both the  `functionId` and `functionAttributes`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@geode.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Avoid retries for non-HA function execution
> -------------------------------------------
>
>                 Key: GEODE-10012
>                 URL: https://issues.apache.org/jira/browse/GEODE-10012
>             Project: Geode
>          Issue Type: Bug
>          Components: native client
>            Reporter: Mario Salazar de Torres
>            Assignee: Mario Salazar de Torres
>            Priority: Major
>              Labels: needsTriage, pull-request-available
>
> *GIVEN* a cluster with 3 servers and 1 locator
> *AND* a PartitionedRegion with redundant-copies="1"
> *AND* a user-defined Function with isHA=false
> *AND* a geode-native client with a pool with single-hop enabled and retry 
> attempts set to 2
> *WHEN* execution fails due to a connectivity issue/connection 
> timeout/internal cluster error
> *THEN* function is retried twice
> ----
> *Additional information.* The function is retried at the network code layer, 
> and given whenever there is a timeout/connectivity error, the BSL is removed 
> from the ClientMetadataService, whenever retries happen, you might get a 
> *InternalFunctionInvocationTargetException* exception thrown indicating: 
> {code:java}
> Multiple target nodes found for single hop operation {code}
> Also, have into account that Java client API behaviour never retries a 
> Function Execution if isHA=false for that function, and whenever isHA=true, 
> the function is retried but on the function execution logic and not at the 
> networking layer.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to