This is an automated email from the ASF dual-hosted git repository. granthenke pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 07cdc32eb70303a1abc628d14669c3208ab202cc Author: Grant Henke <[email protected]> AuthorDate: Sun Feb 16 18:57:16 2020 -0600 KUDU-1563. Add a feature flag for IGNORE operations This patch adds a master server feature flag to indicate that the cluster supports `IGNORE` operations. This includes INSERT_IGNORE, DELETE_IGNORE, and UPDATE_IGNORE. Though this is technically a tserver feature, it is unreasonable to check if every tablet server supports this feature. Instead we use the master as a proxy. In the future KUDU-3211 will add more complete cluster feature flag support. Additionally this patch leverages the feature flag in the Java client on a PingRequest to implement a `supportsIgnoreOperations()` method. This functionality will be used in follow on patches to add ignore operation support to the Spark and Backup integrations in a compatible way. Change-Id: I329bd8bde73d247240ae597b677e2cc20a92343a Reviewed-on: http://gerrit.cloudera.org:8080/16698 Tested-by: Kudu Jenkins Reviewed-by: Alexey Serbin <[email protected]> Reviewed-by: Attila Bukor <[email protected]> --- .../org/apache/kudu/client/AsyncKuduClient.java | 37 ++++++++++++++++++++++ .../java/org/apache/kudu/client/KuduClient.java | 9 ++++++ .../java/org/apache/kudu/client/PingRequest.java | 31 ++++++++++++++++-- .../apache/kudu/client/TestAsyncKuduClient.java | 18 +++++++++++ src/kudu/master/master.proto | 4 +++ src/kudu/master/master_service.cc | 7 ++++ 6 files changed, 104 insertions(+), 2 deletions(-) diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java index bc01475..13162a1 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java @@ -28,6 +28,7 @@ package org.apache.kudu.client; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.kudu.client.ExternalConsistencyMode.CLIENT_PROPAGATED; +import static org.apache.kudu.rpc.RpcHeader.ErrorStatusPB.RpcErrorCodePB.ERROR_INVALID_REQUEST; import java.net.InetAddress; import java.net.UnknownHostException; @@ -2644,6 +2645,42 @@ public class AsyncKuduClient implements AutoCloseable { } /** + * Sends a request to the master to check if the cluster supports ignore operations. + * @return true if the cluster supports ignore operations + */ + @InterfaceAudience.Private + public Deferred<Boolean> supportsIgnoreOperations() { + PingRequest ping = PingRequest.makeMasterPingRequest( + this.masterTable, timer, defaultAdminOperationTimeoutMs); + ping.addRequiredFeature(Master.MasterFeatures.IGNORE_OPERATIONS_VALUE); + Deferred<PingResponse> response = sendRpcToTablet(ping); + return AsyncUtil.addBoth(response, new PingSupportsFeatureCallback()); + } + + private static final class PingSupportsFeatureCallback implements Callback<Boolean, Object> { + @Override + public Boolean call(final Object resp) { + if (resp instanceof Exception) { + // The server returns an RpcRemoteException when the required feature is not supported. + // The exception should have an ERROR_INVALID_REQUEST error code and at least one + // unsupported feature flag. + if (resp instanceof RpcRemoteException && + ((RpcRemoteException) resp).getErrPB().getCode() == ERROR_INVALID_REQUEST && + ((RpcRemoteException) resp).getErrPB().getUnsupportedFeatureFlagsCount() >= 1) { + return false; + } + throw new IllegalStateException((Exception) resp); + } + return true; + } + + @Override + public String toString() { + return "ping supports ignore operations"; + } + } + + /** * Builder class to use in order to connect to Kudu. * All the parameters beyond those in the constructors are optional. */ diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java index 8062f16..7b58be9 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java @@ -411,6 +411,15 @@ public class KuduClient implements AutoCloseable { } /** + * Sends a request to the master to check if the cluster supports ignore operations. + * @return true if the cluster supports ignore operations + */ + @InterfaceAudience.Private + public boolean supportsIgnoreOperations() throws KuduException { + return joinAndHandleException(asyncClient.supportsIgnoreOperations()); + } + + /** * @return a HostAndPort describing the current leader master * @throws KuduException if a leader master could not be found in time */ diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/PingRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/PingRequest.java index cbb8e3c..6651cc0 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/PingRequest.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/PingRequest.java @@ -17,6 +17,10 @@ package org.apache.kudu.client; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + import com.google.protobuf.Message; import io.netty.util.Timer; import org.apache.yetus.audience.InterfaceAudience; @@ -33,9 +37,14 @@ import org.apache.kudu.util.Pair; class PingRequest extends KuduRpc<PingResponse> { private final String serviceName; + private final List<Integer> requiredFeatures = new ArrayList<>(); static PingRequest makeMasterPingRequest() { - return new PingRequest(MASTER_SERVICE_NAME, null, 0); + return makeMasterPingRequest(null, null, 0); + } + + static PingRequest makeMasterPingRequest(KuduTable masterTable, Timer timer, long timeoutMillis) { + return new PingRequest(masterTable, MASTER_SERVICE_NAME, timer, timeoutMillis); } static PingRequest makeTabletServerPingRequest() { @@ -43,10 +52,28 @@ class PingRequest extends KuduRpc<PingResponse> { } private PingRequest(String serviceName, Timer timer, long timeoutMillis) { - super(null, timer, timeoutMillis); + this(null, serviceName, timer, timeoutMillis); + } + + private PingRequest(KuduTable table, String serviceName, Timer timer, long timeoutMillis) { + super(table, timer, timeoutMillis); this.serviceName = serviceName; } + /** + * Add an application-specific feature flag required to service the RPC. + * This can be useful on the Ping request to check if a service supports a feature. + * The server will respond with an RpcRemoteException if a feature is not supported. + */ + void addRequiredFeature(Integer feature) { + requiredFeatures.add(feature); + } + + @Override + Collection<Integer> getRequiredFeatures() { + return requiredFeatures; + } + @Override Message createRequestPB() { return Master.PingRequestPB.getDefaultInstance(); diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java index b1eb798..3c9a4fe 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java @@ -18,6 +18,7 @@ package org.apache.kudu.client; import static java.nio.charset.StandardCharsets.UTF_8; +import static junit.framework.TestCase.assertFalse; import static org.apache.kudu.test.ClientTestUtil.countRowsInScan; import static org.apache.kudu.test.ClientTestUtil.createBasicSchemaInsert; import static org.apache.kudu.test.ClientTestUtil.getBasicCreateTableOptions; @@ -279,4 +280,21 @@ public class TestAsyncKuduClient { assertTrue(nre.getMessage().startsWith("Got out-of-order key column")); } } + + /** + * Test supportsIgnoreOperations() when the cluster does support them. + */ + @Test(timeout = 100000) + public void testSupportsIgnoreOperationsTrue() throws Exception { + assertTrue(asyncClient.supportsIgnoreOperations().join()); + } + + /** + * Test supportsIgnoreOperations() when the cluster does not support them. + */ + @Test(timeout = 100000) + @KuduTestHarness.MasterServerConfig(flags = { "--master_support_ignore_operations=false" }) + public void testSupportsIgnoreOperationsFalse() throws Exception { + assertFalse(asyncClient.supportsIgnoreOperations().join()); + } } diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto index 4898509..750761f 100644 --- a/src/kudu/master/master.proto +++ b/src/kudu/master/master.proto @@ -1007,6 +1007,10 @@ enum MasterFeatures { GENERATE_AUTHZ_TOKEN = 5; // The master supports dynamic addition/removal of masters DYNAMIC_MULTI_MASTER = 6; + // Whether the cluster supports INSERT_IGNORE, DELETE_IGNORE, and UPDATE_IGNORE operations. + // Though this is technically a tserver feature, it's unreasonable to check if every + // tablet server supports this feature. Instead we use the master as a proxy. + IGNORE_OPERATIONS = 7; } service MasterService { diff --git a/src/kudu/master/master_service.cc b/src/kudu/master/master_service.cc index f1d30c9..6ef5406 100644 --- a/src/kudu/master/master_service.cc +++ b/src/kudu/master/master_service.cc @@ -101,6 +101,11 @@ DEFINE_bool(master_support_change_config, false, TAG_FLAG(master_support_change_config, hidden); TAG_FLAG(master_support_change_config, unsafe); +DEFINE_bool(master_support_ignore_operations, true, + "Whether the cluster supports support ignore operations."); +TAG_FLAG(master_support_ignore_operations, hidden); +TAG_FLAG(master_support_ignore_operations, runtime); + using google::protobuf::Message; using kudu::consensus::ReplicaManagementInfoPB; @@ -798,6 +803,8 @@ bool MasterServiceImpl::SupportsFeature(uint32_t feature) const { return FLAGS_master_support_connect_to_master_rpc; case MasterFeatures::DYNAMIC_MULTI_MASTER: return FLAGS_master_support_change_config; + case MasterFeatures::IGNORE_OPERATIONS: + return FLAGS_master_support_ignore_operations; default: return false; }
