This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 07cdc32eb70303a1abc628d14669c3208ab202cc
Author: Grant Henke <[email protected]>
AuthorDate: Sun Feb 16 18:57:16 2020 -0600

    KUDU-1563. Add a feature flag for IGNORE operations
    
    This patch adds a master server feature flag to indicate that the cluster
    supports `IGNORE` operations. This includes INSERT_IGNORE, DELETE_IGNORE,
    and UPDATE_IGNORE. Though this is technically a tserver feature, it is
    unreasonable to check if every tablet server supports this feature.
    Instead we use the master as a proxy.
    
    In the future KUDU-3211 will add more complete cluster feature flag support.
    
    Additionally this patch leverages the feature flag in the Java client on a
    PingRequest to implement a `supportsIgnoreOperations()` method. This
    functionality will be used in follow on patches to add ignore operation 
support
    to the Spark and Backup integrations in a compatible way.
    
    Change-Id: I329bd8bde73d247240ae597b677e2cc20a92343a
    Reviewed-on: http://gerrit.cloudera.org:8080/16698
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <[email protected]>
    Reviewed-by: Attila Bukor <[email protected]>
---
 .../org/apache/kudu/client/AsyncKuduClient.java    | 37 ++++++++++++++++++++++
 .../java/org/apache/kudu/client/KuduClient.java    |  9 ++++++
 .../java/org/apache/kudu/client/PingRequest.java   | 31 ++++++++++++++++--
 .../apache/kudu/client/TestAsyncKuduClient.java    | 18 +++++++++++
 src/kudu/master/master.proto                       |  4 +++
 src/kudu/master/master_service.cc                  |  7 ++++
 6 files changed, 104 insertions(+), 2 deletions(-)

diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index bc01475..13162a1 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -28,6 +28,7 @@ package org.apache.kudu.client;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.kudu.client.ExternalConsistencyMode.CLIENT_PROPAGATED;
+import static 
org.apache.kudu.rpc.RpcHeader.ErrorStatusPB.RpcErrorCodePB.ERROR_INVALID_REQUEST;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
@@ -2644,6 +2645,42 @@ public class AsyncKuduClient implements AutoCloseable {
   }
 
   /**
+   * Sends a request to the master to check if the cluster supports ignore 
operations.
+   * @return true if the cluster supports ignore operations
+   */
+  @InterfaceAudience.Private
+  public Deferred<Boolean> supportsIgnoreOperations() {
+    PingRequest ping = PingRequest.makeMasterPingRequest(
+        this.masterTable, timer, defaultAdminOperationTimeoutMs);
+    ping.addRequiredFeature(Master.MasterFeatures.IGNORE_OPERATIONS_VALUE);
+    Deferred<PingResponse> response = sendRpcToTablet(ping);
+    return AsyncUtil.addBoth(response, new PingSupportsFeatureCallback());
+  }
+
+  private static final class PingSupportsFeatureCallback implements 
Callback<Boolean, Object> {
+    @Override
+    public Boolean call(final Object resp) {
+      if (resp instanceof Exception) {
+        // The server returns an RpcRemoteException when the required feature 
is not supported.
+        // The exception should have an ERROR_INVALID_REQUEST error code and 
at least one
+        // unsupported feature flag.
+        if (resp instanceof RpcRemoteException &&
+            ((RpcRemoteException) resp).getErrPB().getCode() == 
ERROR_INVALID_REQUEST &&
+            ((RpcRemoteException) 
resp).getErrPB().getUnsupportedFeatureFlagsCount() >= 1) {
+          return false;
+        }
+        throw new IllegalStateException((Exception) resp);
+      }
+      return true;
+    }
+
+    @Override
+    public String toString() {
+      return "ping supports ignore operations";
+    }
+  }
+
+  /**
    * Builder class to use in order to connect to Kudu.
    * All the parameters beyond those in the constructors are optional.
    */
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
index 8062f16..7b58be9 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
@@ -411,6 +411,15 @@ public class KuduClient implements AutoCloseable {
   }
 
   /**
+   * Sends a request to the master to check if the cluster supports ignore 
operations.
+   * @return true if the cluster supports ignore operations
+   */
+  @InterfaceAudience.Private
+  public boolean supportsIgnoreOperations() throws KuduException {
+    return joinAndHandleException(asyncClient.supportsIgnoreOperations());
+  }
+
+  /**
    * @return a HostAndPort describing the current leader master
    * @throws KuduException if a leader master could not be found in time
    */
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/PingRequest.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/PingRequest.java
index cbb8e3c..6651cc0 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/PingRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/PingRequest.java
@@ -17,6 +17,10 @@
 
 package org.apache.kudu.client;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
 import com.google.protobuf.Message;
 import io.netty.util.Timer;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -33,9 +37,14 @@ import org.apache.kudu.util.Pair;
 class PingRequest extends KuduRpc<PingResponse> {
 
   private final String serviceName;
+  private final List<Integer> requiredFeatures = new ArrayList<>();
 
   static PingRequest makeMasterPingRequest() {
-    return new PingRequest(MASTER_SERVICE_NAME, null, 0);
+    return makeMasterPingRequest(null, null, 0);
+  }
+
+  static PingRequest makeMasterPingRequest(KuduTable masterTable, Timer timer, 
long timeoutMillis) {
+    return new PingRequest(masterTable, MASTER_SERVICE_NAME, timer, 
timeoutMillis);
   }
 
   static PingRequest makeTabletServerPingRequest() {
@@ -43,10 +52,28 @@ class PingRequest extends KuduRpc<PingResponse> {
   }
 
   private PingRequest(String serviceName, Timer timer, long timeoutMillis) {
-    super(null, timer, timeoutMillis);
+    this(null, serviceName, timer, timeoutMillis);
+  }
+
+  private PingRequest(KuduTable table, String serviceName, Timer timer, long 
timeoutMillis) {
+    super(table, timer, timeoutMillis);
     this.serviceName = serviceName;
   }
 
+  /**
+   * Add an application-specific feature flag required to service the RPC.
+   * This can be useful on the Ping request to check if a service supports a 
feature.
+   * The server will respond with an RpcRemoteException if a feature is not 
supported.
+   */
+  void addRequiredFeature(Integer feature) {
+    requiredFeatures.add(feature);
+  }
+
+  @Override
+  Collection<Integer> getRequiredFeatures() {
+    return requiredFeatures;
+  }
+
   @Override
   Message createRequestPB() {
     return Master.PingRequestPB.getDefaultInstance();
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
index b1eb798..3c9a4fe 100644
--- 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
+++ 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
@@ -18,6 +18,7 @@
 package org.apache.kudu.client;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static junit.framework.TestCase.assertFalse;
 import static org.apache.kudu.test.ClientTestUtil.countRowsInScan;
 import static org.apache.kudu.test.ClientTestUtil.createBasicSchemaInsert;
 import static org.apache.kudu.test.ClientTestUtil.getBasicCreateTableOptions;
@@ -279,4 +280,21 @@ public class TestAsyncKuduClient {
       assertTrue(nre.getMessage().startsWith("Got out-of-order key column"));
     }
   }
+
+  /**
+   * Test supportsIgnoreOperations() when the cluster does support them.
+   */
+  @Test(timeout = 100000)
+  public void testSupportsIgnoreOperationsTrue() throws Exception {
+    assertTrue(asyncClient.supportsIgnoreOperations().join());
+  }
+
+  /**
+   * Test supportsIgnoreOperations() when the cluster does not support them.
+   */
+  @Test(timeout = 100000)
+  @KuduTestHarness.MasterServerConfig(flags = { 
"--master_support_ignore_operations=false" })
+  public void testSupportsIgnoreOperationsFalse() throws Exception {
+    assertFalse(asyncClient.supportsIgnoreOperations().join());
+  }
 }
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index 4898509..750761f 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -1007,6 +1007,10 @@ enum MasterFeatures {
   GENERATE_AUTHZ_TOKEN = 5;
   // The master supports dynamic addition/removal of masters
   DYNAMIC_MULTI_MASTER = 6;
+  // Whether the cluster supports INSERT_IGNORE, DELETE_IGNORE, and 
UPDATE_IGNORE operations.
+  // Though this is technically a tserver feature, it's unreasonable to check 
if every
+  // tablet server supports this feature. Instead we use the master as a proxy.
+  IGNORE_OPERATIONS = 7;
 }
 
 service MasterService {
diff --git a/src/kudu/master/master_service.cc 
b/src/kudu/master/master_service.cc
index f1d30c9..6ef5406 100644
--- a/src/kudu/master/master_service.cc
+++ b/src/kudu/master/master_service.cc
@@ -101,6 +101,11 @@ DEFINE_bool(master_support_change_config, false,
 TAG_FLAG(master_support_change_config, hidden);
 TAG_FLAG(master_support_change_config, unsafe);
 
+DEFINE_bool(master_support_ignore_operations, true,
+            "Whether the cluster supports support ignore operations.");
+TAG_FLAG(master_support_ignore_operations, hidden);
+TAG_FLAG(master_support_ignore_operations, runtime);
+
 
 using google::protobuf::Message;
 using kudu::consensus::ReplicaManagementInfoPB;
@@ -798,6 +803,8 @@ bool MasterServiceImpl::SupportsFeature(uint32_t feature) 
const {
       return FLAGS_master_support_connect_to_master_rpc;
     case MasterFeatures::DYNAMIC_MULTI_MASTER:
       return FLAGS_master_support_change_config;
+    case MasterFeatures::IGNORE_OPERATIONS:
+      return FLAGS_master_support_ignore_operations;
     default:
       return false;
   }

Reply via email to