This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
     new e35ade9fb9d KAFKA-14455: Kafka Connect create and update REST APIs 
should surface failures while writing to the config topic (#12984)
e35ade9fb9d is described below

commit e35ade9fb9d34f75b102ea2371db4d137866d464
Author: Yash Mayya <[email protected]>
AuthorDate: Thu Feb 2 21:33:38 2023 +0530

    KAFKA-14455: Kafka Connect create and update REST APIs should surface 
failures while writing to the config topic (#12984)
    
    Reviewers: Greg Harris <[email protected]>, Chris Egerton 
<[email protected]>
---
 .../java/org/apache/kafka/common/utils/Timer.java  |  10 +-
 .../runtime/distributed/DistributedHerder.java     |   1 -
 .../connect/storage/KafkaConfigBackingStore.java   | 165 ++++++++++++++++-----
 .../apache/kafka/connect/util/KafkaBasedLog.java   |  26 +++-
 .../kafka/connect/runtime/AbstractHerderTest.java  |  26 ++++
 .../runtime/distributed/DistributedHerderTest.java |  53 +++++++
 .../storage/KafkaConfigBackingStoreTest.java       | 127 ++++++++++++++--
 .../util/clusters/EmbeddedConnectCluster.java      |   2 +-
 8 files changed, 353 insertions(+), 57 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Timer.java 
b/clients/src/main/java/org/apache/kafka/common/utils/Timer.java
index 98b09a38d36..d60eea25710 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Timer.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Timer.java
@@ -26,7 +26,7 @@ package org.apache.kafka.common.utils;
  * This class also ensures monotonic updates to the timer even if the 
underlying clock is subject
  * to non-monotonic behavior. For example, the remaining time returned by 
{@link #remainingMs()} is
  * guaranteed to decrease monotonically until it hits zero.
- *
+ * <p>
  * Note that it is up to the caller to ensure progress of the timer using one 
of the
  * {@link #update()} methods or {@link #sleep(long)}. The timer will cache the 
current time and
  * return it indefinitely until the timer has been updated. This allows the 
caller to limit
@@ -34,14 +34,14 @@ package org.apache.kafka.common.utils;
  * waiting a request sent through the {@link 
org.apache.kafka.clients.NetworkClient} should call
  * {@link #update()} following each blocking call to
  * {@link org.apache.kafka.clients.NetworkClient#poll(long, long)}.
- *
+ * <p>
  * A typical usage might look something like this:
  *
  * <pre>
  *     Time time = Time.SYSTEM;
  *     Timer timer = time.timer(500);
  *
- *     while (!conditionSatisfied() && timer.notExpired) {
+ *     while (!conditionSatisfied() && timer.notExpired()) {
  *         client.poll(timer.remainingMs(), timer.currentTimeMs());
  *         timer.update();
  *     }
@@ -137,7 +137,7 @@ public class Timer {
     /**
      * Update the cached current time to a specific value. In some contexts, 
the caller may already
      * have an accurate time, so this avoids unnecessary calls to system time.
-     *
+     * <p>
      * Note that if the updated current time is smaller than the cached time, 
then the update
      * is ignored.
      *
@@ -161,7 +161,7 @@ public class Timer {
     /**
      * Get the current time in milliseconds. This will return the same cached 
value until the timer
      * has been updated using one of the {@link #update()} methods or {@link 
#sleep(long)} is used.
-     *
+     * <p>
      * Note that the value returned is guaranteed to increase monotonically 
even if the underlying
      * {@link Time} implementation goes backwards. Effectively, the timer will 
just wait for the
      * time to catch up.
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 0f45be163cc..d2b7f75f0fd 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -1063,7 +1063,6 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
                             // Note that we use the updated connector config 
despite the fact that we don't have an updated
                             // snapshot yet. The existing task info should 
still be accurate.
                             ConnectorInfo info = new ConnectorInfo(connName, 
config, configState.tasks(connName),
-                                // validateConnectorConfig have checked the 
existence of CONNECTOR_CLASS_CONFIG
                                 connectorType(config));
                             callback.onCompletion(null, new Created<>(!exists, 
info));
                             return null;
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index bceb73e1715..e9f87697308 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.TopicConfig;
@@ -33,6 +34,7 @@ import 
org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
@@ -40,11 +42,13 @@ import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.RestartRequest;
 import org.apache.kafka.connect.runtime.SessionKey;
 import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
+import 
org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectUtils;
@@ -70,6 +74,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
@@ -79,26 +84,45 @@ import java.util.function.Supplier;
  * Provides persistent storage of Kafka Connect connector configurations in a 
Kafka topic.
  * </p>
  * <p>
- * This class manages both connector and task configurations. It tracks three 
types of configuration entries:
+ * This class manages both connector and task configurations, among other 
various configurations. It tracks seven types
+ * of records:
  * <p/>
- * 1. Connector config: map of string -> string configurations passed to the 
Connector class, with support for
+ * <ol>
+ * <li> Connector config: map of string -> string configurations passed to the 
Connector class, with support for
  * expanding this format if necessary. (Kafka key: connector-[connector-id]).
  * These configs are *not* ephemeral. They represent the source of truth. If 
the entire Connect
- * cluster goes down, this is all that is really needed to recover.
- * 2. Task configs: map of string -> string configurations passed to the Task 
class, with support for expanding
+ * cluster goes down, this is all that is really needed to recover. </li>
+ * <li> Task configs: map of string -> string configurations passed to the 
Task class, with support for expanding
  * this format if necessary. (Kafka key: task-[connector-id]-[task-id]).
- * These configs are ephemeral; they are stored here to a) disseminate them to 
all workers while
- * ensuring agreement and b) to allow faster cluster/worker recovery since the 
common case
- * of recovery (restoring a connector) will simply result in the same 
configuration as before
- * the failure.
- * 3. Task commit "configs": records indicating that previous task config 
entries should be committed and all task
+ * These configs are ephemeral; they are stored here to
+ * <ul>
+ * <li> disseminate them to all workers while ensuring agreement </li>
+ * <li> to allow faster cluster/worker recovery since the common case of 
recovery (restoring a connector) will simply
+ * result in the same task configuration as before the failure. </li>
+ * </ul>
+ * </li>
+ * <li> Task commit "configs": records indicating that previous task config 
entries should be committed and all task
  * configs for a connector can be applied. (Kafka key: commit-[connector-id].
  * This config has two effects. First, it records the number of tasks the 
connector is currently
  * running (and can therefore increase/decrease parallelism). Second, because 
each task config
  * is stored separately but they need to be applied together to ensure each 
partition is assigned
  * to a single task, this record also indicates that task configs for the 
specified connector
- * can be "applied" or "committed".
- * </p>
+ * can be "applied" or "committed". </li>
+ * <li> Connector target states: records indicating the {@link TargetState} 
for a connector </li>
+ * <li> {@link RestartRequest Restart requests}: records representing requests 
to restart a connector and / or its
+ * tasks. See <a 
href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308623";>KIP-745</a>
 for more
+ * details.</li>
+ * <li> Task count records: an integer value that that tracks the number of 
task producers (for source connectors) that
+ * will have to be fenced out if a connector is reconfigured before bringing 
up any tasks with the new set of task
+ * configurations. This is required for exactly-once support for source 
connectors, see
+ * <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors";>KIP-618</a>
+ * for more details.</li>
+ * <li> Session key records: A {@link SessionKey} generated by the leader of 
the cluster when
+ * {@link ConnectProtocolCompatibility#SESSIONED} is the Connect protocol 
being used by the cluster. This session key
+ * is used to verify internal REST requests between workers in the cluster. See
+ * <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-507%3A+Securing+Internal+Connect+REST+Endpoints";>KIP-507</a>
+ * for more details.</li>
+ * </ol>
  * <p>
  * This configuration is expected to be stored in a *single partition* and 
*compacted* topic. Using a single partition
  * ensures we can enforce ordering on messages, allowing Kafka to be used as a 
write ahead log. Compaction allows
@@ -148,7 +172,7 @@ import java.util.function.Supplier;
  * Because we can encounter these inconsistencies and addressing them requires 
support from the rest of the system
  * (resolving the task configuration inconsistencies requires support from the 
connector instance to regenerate updated
  * configs), this class exposes not only the current set of configs, but also 
which connectors have inconsistent data.
- * This allows users of this class (i.e., Herder implementations) to take 
action to resolve any inconsistencies. These
+ * This allows users of this class (i.e., {@link Herder} implementations) to 
take action to resolve any inconsistencies. These
  * inconsistencies should be rare (as described above, due to compaction 
combined with leader failures in the middle
  * of updating task configurations).
  * </p>
@@ -241,7 +265,8 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
             .field(ONLY_FAILED_FIELD_NAME, Schema.BOOLEAN_SCHEMA)
             .build();
 
-    private static final long READ_TO_END_TIMEOUT_MS = 30000;
+    // Visible for testing
+    static final long READ_WRITE_TOTAL_TIMEOUT_MS = 30000;
 
     private final Object lock;
     private final Converter converter;
@@ -289,6 +314,7 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
     private final boolean usesFencableWriter;
     private volatile Producer<String, byte[]> fencableProducer;
     private final Map<String, Object> fencableProducerProps;
+    private final Time time;
 
     @Deprecated
     public KafkaConfigBackingStore(Converter converter, DistributedConfig 
config, WorkerConfigTransformer configTransformer) {
@@ -296,6 +322,10 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
     }
 
     public KafkaConfigBackingStore(Converter converter, DistributedConfig 
config, WorkerConfigTransformer configTransformer, Supplier<TopicAdmin> 
adminSupplier, String clientIdBase) {
+        this(converter, config, configTransformer, adminSupplier, 
clientIdBase, Time.SYSTEM);
+    }
+
+    KafkaConfigBackingStore(Converter converter, DistributedConfig config, 
WorkerConfigTransformer configTransformer, Supplier<TopicAdmin> adminSupplier, 
String clientIdBase, Time time) {
         this.lock = new Object();
         this.started = false;
         this.converter = converter;
@@ -320,6 +350,7 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
 
         configLog = setupAndCreateKafkaBasedLog(this.topic, config);
         this.configTransformer = configTransformer;
+        this.time = time;
     }
 
     @Override
@@ -469,8 +500,9 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
         connectConfig.put("properties", properties);
         byte[] serializedConfig = converter.fromConnectData(topic, 
CONNECTOR_CONFIGURATION_V0, connectConfig);
         try {
-            sendPrivileged(CONNECTOR_KEY(connector), serializedConfig);
-            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+            Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
+            sendPrivileged(CONNECTOR_KEY(connector), serializedConfig, timer);
+            configLog.readToEnd().get(timer.remainingMs(), 
TimeUnit.MILLISECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
             log.error("Failed to write connector configuration to Kafka: ", e);
             throw new ConnectException("Error writing connector configuration 
to Kafka", e);
@@ -490,9 +522,14 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
     public void removeConnectorConfig(String connector) {
         log.debug("Removing connector configuration for connector '{}'", 
connector);
         try {
-            sendPrivileged(CONNECTOR_KEY(connector), null);
-            sendPrivileged(TARGET_STATE_KEY(connector), null);
-            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+            Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
+            List<ProducerKeyValue> keyValues = Arrays.asList(
+                    new ProducerKeyValue(CONNECTOR_KEY(connector), null),
+                    new ProducerKeyValue(TARGET_STATE_KEY(connector), null)
+            );
+            sendPrivileged(keyValues, timer);
+
+            configLog.readToEnd().get(timer.remainingMs(), 
TimeUnit.MILLISECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
             log.error("Failed to remove connector configuration from Kafka: ", 
e);
             throw new ConnectException("Error removing connector configuration 
from Kafka", e);
@@ -521,10 +558,12 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
      */
     @Override
     public void putTaskConfigs(String connector, List<Map<String, String>> 
configs) {
+        Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
         // Make sure we're at the end of the log. We should be the only 
writer, but we want to make sure we don't have
         // any outstanding lagging data to consume.
         try {
-            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+            configLog.readToEnd().get(timer.remainingMs(), 
TimeUnit.MILLISECONDS);
+            timer.update();
         } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
             log.error("Failed to write root configuration to Kafka: ", e);
             throw new ConnectException("Error writing root configuration to 
Kafka", e);
@@ -532,34 +571,44 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
 
         int taskCount = configs.size();
 
-        // Start sending all the individual updates
+        // Send all the individual updates
         int index = 0;
+        List<ProducerKeyValue> keyValues = new ArrayList<>();
         for (Map<String, String> taskConfig: configs) {
             Struct connectConfig = new Struct(TASK_CONFIGURATION_V0);
             connectConfig.put("properties", taskConfig);
             byte[] serializedConfig = converter.fromConnectData(topic, 
TASK_CONFIGURATION_V0, connectConfig);
             log.debug("Writing configuration for connector '{}' task {}", 
connector, index);
             ConnectorTaskId connectorTaskId = new ConnectorTaskId(connector, 
index);
-            sendPrivileged(TASK_KEY(connectorTaskId), serializedConfig);
+            keyValues.add(new ProducerKeyValue(TASK_KEY(connectorTaskId), 
serializedConfig));
             index++;
         }
 
+        try {
+            sendPrivileged(keyValues, timer);
+        } catch (ExecutionException | InterruptedException | TimeoutException 
e) {
+            log.error("Failed to write task configurations to Kafka", e);
+            throw new ConnectException("Error writing task configurations to 
Kafka", e);
+        }
+
         // Finally, send the commit to update the number of tasks and apply 
the new configs, then wait until we read to
         // the end of the log
         try {
             // Read to end to ensure all the task configs have been written
             if (taskCount > 0) {
-                configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+                configLog.readToEnd().get(timer.remainingMs(), 
TimeUnit.MILLISECONDS);
+                timer.update();
             }
             // Write the commit message
             Struct connectConfig = new Struct(CONNECTOR_TASKS_COMMIT_V0);
             connectConfig.put("tasks", taskCount);
             byte[] serializedConfig = converter.fromConnectData(topic, 
CONNECTOR_TASKS_COMMIT_V0, connectConfig);
             log.debug("Writing commit for connector '{}' with {} tasks.", 
connector, taskCount);
-            sendPrivileged(COMMIT_TASKS_KEY(connector), serializedConfig);
+
+            sendPrivileged(COMMIT_TASKS_KEY(connector), serializedConfig, 
timer);
 
             // Read to end to ensure all the commit messages have been written
-            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+            configLog.readToEnd().get(timer.remainingMs(), 
TimeUnit.MILLISECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
             log.error("Failed to write root configuration to Kafka: ", e);
             throw new ConnectException("Error writing root configuration to 
Kafka", e);
@@ -587,7 +636,12 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
         connectTargetState.put("state", state.name());
         byte[] serializedTargetState = converter.fromConnectData(topic, 
TARGET_STATE_V0, connectTargetState);
         log.debug("Writing target state {} for connector {}", state, 
connector);
-        configLog.send(TARGET_STATE_KEY(connector), serializedTargetState);
+        try {
+            configLog.send(TARGET_STATE_KEY(connector), 
serializedTargetState).get(READ_WRITE_TOTAL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
+            log.error("Failed to write target state to Kafka", e);
+            throw new ConnectException("Error writing target state to Kafka", 
e);
+        }
     }
 
     /**
@@ -608,8 +662,9 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
         byte[] serializedTaskCountRecord = converter.fromConnectData(topic, 
TASK_COUNT_RECORD_V0, taskCountRecord);
         log.debug("Writing task count record {} for connector {}", taskCount, 
connector);
         try {
-            sendPrivileged(TASK_COUNT_RECORD_KEY(connector), 
serializedTaskCountRecord);
-            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+            Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
+            sendPrivileged(TASK_COUNT_RECORD_KEY(connector), 
serializedTaskCountRecord, timer);
+            configLog.readToEnd().get(timer.remainingMs(), 
TimeUnit.MILLISECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
             log.error("Failed to write task count record with {} tasks for 
connector {} to Kafka: ", taskCount, connector, e);
             throw new ConnectException("Error writing task count record to 
Kafka", e);
@@ -635,8 +690,9 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
         sessionKeyStruct.put("creation-timestamp", 
sessionKey.creationTimestamp());
         byte[] serializedSessionKey = converter.fromConnectData(topic, 
SESSION_KEY_V0, sessionKeyStruct);
         try {
-            sendPrivileged(SESSION_KEY_KEY, serializedSessionKey);
-            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+            Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
+            sendPrivileged(SESSION_KEY_KEY, serializedSessionKey, timer);
+            configLog.readToEnd().get(timer.remainingMs(), 
TimeUnit.MILLISECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
             log.error("Failed to write session key to Kafka: ", e);
             throw new ConnectException("Error writing session key to Kafka", 
e);
@@ -658,8 +714,9 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
         value.put(ONLY_FAILED_FIELD_NAME, restartRequest.onlyFailed());
         byte[] serializedValue = converter.fromConnectData(topic, 
value.schema(), value);
         try {
-            sendPrivileged(key, serializedValue);
-            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+            Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
+            sendPrivileged(key, serializedValue, timer);
+            configLog.readToEnd().get(timer.remainingMs(), 
TimeUnit.MILLISECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
             log.error("Failed to write {} to Kafka: ", restartRequest, e);
             throw new ConnectException("Error writing " + restartRequest + " 
to Kafka", e);
@@ -711,9 +768,36 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
         return createKafkaBasedLog(topic, producerProps, consumerProps, new 
ConsumeCallback(), topicDescription, adminSupplier);
     }
 
-    private void sendPrivileged(String key, byte[] value) {
+    /**
+     * Send a single record to the config topic synchronously. Note that 
{@link #claimWritePrivileges()} must be
+     * successfully invoked before calling this method if this store is 
configured to use a fencable writer.
+     * @param key the record key
+     * @param value the record value
+     * @param timer Timer bounding how long this method can block. The timer 
is updated before the method returns.
+     */
+    private void sendPrivileged(String key, byte[] value, Timer timer) throws 
ExecutionException, InterruptedException, TimeoutException {
+        sendPrivileged(Collections.singletonList(new ProducerKeyValue(key, 
value)), timer);
+    }
+
+    /**
+     * Send one or more records to the config topic synchronously. Note that 
{@link #claimWritePrivileges()} must be
+     * successfully invoked before calling this method if this store is 
configured to use a fencable writer.
+     * @param keyValues the list of producer record key/value pairs
+     * @param timer Timer bounding how long this method can block. The timer 
is updated before the method returns.
+     */
+    private void sendPrivileged(List<ProducerKeyValue> keyValues, Timer timer) 
throws ExecutionException, InterruptedException, TimeoutException {
         if (!usesFencableWriter) {
-            configLog.send(key, value);
+            List<Future<RecordMetadata>> producerFutures = new ArrayList<>();
+            keyValues.forEach(
+                    keyValue -> 
producerFutures.add(configLog.send(keyValue.key, keyValue.value))
+            );
+
+            timer.update();
+            for (Future<RecordMetadata> future : producerFutures) {
+                future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
+                timer.update();
+            }
+
             return;
         }
 
@@ -723,8 +807,11 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
 
         try {
             fencableProducer.beginTransaction();
-            fencableProducer.send(new ProducerRecord<>(topic, key, value));
+            keyValues.forEach(
+                    keyValue -> fencableProducer.send(new 
ProducerRecord<>(topic, keyValue.key, keyValue.value))
+            );
             fencableProducer.commitTransaction();
+            timer.update();
         } catch (Exception e) {
             log.warn("Failed to perform fencable send to config topic", e);
             relinquishWritePrivileges();
@@ -732,6 +819,16 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
         }
     }
 
+    private static class ProducerKeyValue {
+        final String key;
+        final byte[] value;
+
+        ProducerKeyValue(String key, byte[] value) {
+            this.key = key;
+            this.value = value;
+        }
+    }
+
     private void relinquishWritePrivileges() {
         if (fencableProducer != null) {
             Utils.closeQuietly(() -> fencableProducer.close(Duration.ZERO), 
"fencable producer for config topic");
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index cde63b3f833..899b42dd877 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.PartitionInfo;
@@ -342,12 +343,29 @@ public class KafkaBasedLog<K, V> {
         return future;
     }
 
-    public void send(K key, V value) {
-        send(key, value, null);
+    /**
+     * Send a record asynchronously to the configured {@link #topic} without 
using a producer callback.
+     * @param key the key for the {@link ProducerRecord}
+     * @param value the value for the {@link ProducerRecord}
+     *
+     * @return the future from the call to {@link Producer#send}. {@link 
Future#get} can be called on this returned
+     *         future if synchronous behavior is desired.
+     */
+    public Future<RecordMetadata> send(K key, V value) {
+        return send(key, value, null);
     }
 
-    public void send(K key, V value, 
org.apache.kafka.clients.producer.Callback callback) {
-        producer.orElseThrow(() ->
+    /**
+     * Send a record asynchronously to the configured {@link #topic}
+     * @param key the key for the {@link ProducerRecord}
+     * @param value the value for the {@link ProducerRecord}
+     * @param callback the callback to invoke after completion; can be null if 
no callback is desired
+     *
+     * @return the future from the call to {@link Producer#send}. {@link 
Future#get} can be called on this returned
+     *         future if synchronous behavior is desired.
+     */
+    public Future<RecordMetadata> send(K key, V value, 
org.apache.kafka.clients.producer.Callback callback) {
+        return producer.orElseThrow(() ->
                 new IllegalStateException("This KafkaBasedLog was created in 
read-only mode and does not support write operations")
         ).send(new ProducerRecord<>(topic, key, value), callback);
     }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index eddabba8012..a50d4dfe898 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -248,6 +248,32 @@ public class AbstractHerderTest {
         assertEquals(ConnectorType.SOURCE, info.type());
     }
 
+    @Test
+    public void testPauseConnector() {
+        AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+                .useConstructor(worker, workerId, kafkaClusterId, statusStore, 
configStore, noneConnectorClientConfigOverridePolicy)
+                .defaultAnswer(CALLS_REAL_METHODS));
+
+        when(configStore.contains(CONN1)).thenReturn(true);
+
+        herder.pauseConnector(CONN1);
+
+        verify(configStore).putTargetState(CONN1, TargetState.PAUSED);
+    }
+
+    @Test
+    public void testResumeConnector() {
+        AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+                .useConstructor(worker, workerId, kafkaClusterId, statusStore, 
configStore, noneConnectorClientConfigOverridePolicy)
+                .defaultAnswer(CALLS_REAL_METHODS));
+
+        when(configStore.contains(CONN1)).thenReturn(true);
+
+        herder.resumeConnector(CONN1);
+
+        verify(configStore).putTargetState(CONN1, TargetState.STARTED);
+    }
+
     @Test
     public void testConnectorInfoMissingPlugin() {
         AbstractHerder herder = mock(AbstractHerder.class, withSettings()
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 1d79d59aa33..ec45e37c58b 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -739,6 +739,58 @@ public class DistributedHerderTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testCreateConnectorConfigBackingStoreError() {
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+        member.wakeup();
+        PowerMock.expectLastCall();
+
+        // mock the actual validation since its asynchronous nature is 
difficult to test and should
+        // be covered sufficiently by the unit tests for the AbstractHerder 
class
+        Capture<Callback<ConfigInfos>> validateCallback = newCapture();
+        herder.validateConnectorConfig(EasyMock.eq(CONN2_CONFIG), 
capture(validateCallback));
+        PowerMock.expectLastCall().andAnswer(() -> {
+            validateCallback.getValue().onCompletion(null, CONN2_CONFIG_INFOS);
+            return null;
+        });
+
+        configBackingStore.putConnectorConfig(CONN2, CONN2_CONFIG);
+        PowerMock.expectLastCall().andThrow(new ConnectException("Error 
writing connector configuration to Kafka"));
+
+        // verify that the exception from config backing store write is 
propagated via the callback
+        
putConnectorCallback.onCompletion(EasyMock.anyObject(ConnectException.class), 
EasyMock.isNull());
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+        // These will occur just before/during the second tick
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.putConnectorConfig(CONN2, CONN2_CONFIG, false, 
putConnectorCallback);
+        // First tick runs the initial herder request, which issues an 
asynchronous request for
+        // connector validation
+        herder.tick();
+
+        // Once that validation is complete, another request is added to the 
herder request queue
+        // for actually performing the config write; this tick is for that 
request
+        herder.tick();
+
+        time.sleep(1000L);
+        assertStatistics(3, 1, 100, 1000L);
+
+        PowerMock.verifyAll();
+    }
+
     @Test
     public void testCreateConnectorFailedValidation() throws Exception {
         EasyMock.expect(member.memberId()).andStubReturn("leader");
@@ -2596,6 +2648,7 @@ public class DistributedHerderTest {
         });
         member.wakeup();
         PowerMock.expectLastCall();
+
         configBackingStore.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED);
         PowerMock.expectLastCall().andAnswer(() -> {
             // Simulate response to writing config + waiting until end of log 
to be read
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 2197d7f7a09..8fb3bf30165 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -22,15 +22,19 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.RestartRequest;
 import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.WorkerConfig;
@@ -62,6 +66,8 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
@@ -72,6 +78,7 @@ import static 
org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_
 import static 
org.apache.kafka.connect.runtime.distributed.DistributedConfig.GROUP_ID_CONFIG;
 import static 
org.apache.kafka.connect.storage.KafkaConfigBackingStore.INCLUDE_TASKS_FIELD_NAME;
 import static 
org.apache.kafka.connect.storage.KafkaConfigBackingStore.ONLY_FAILED_FIELD_NAME;
+import static 
org.apache.kafka.connect.storage.KafkaConfigBackingStore.READ_WRITE_TOTAL_TIMEOUT_MS;
 import static 
org.apache.kafka.connect.storage.KafkaConfigBackingStore.RESTART_KEY;
 import static 
org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG;
 import static org.junit.Assert.assertEquals;
@@ -170,14 +177,17 @@ public class KafkaConfigBackingStoreTest {
     KafkaBasedLog<String, byte[]> storeLog;
     @Mock
     Producer<String, byte[]> fencableProducer;
+    @Mock
+    Future<RecordMetadata> producerFuture;
     private KafkaConfigBackingStore configStorage;
 
-    private Capture<String> capturedTopic = EasyMock.newCapture();
-    private Capture<Map<String, Object>> capturedProducerProps = 
EasyMock.newCapture();
-    private Capture<Map<String, Object>> capturedConsumerProps = 
EasyMock.newCapture();
-    private Capture<Supplier<TopicAdmin>> capturedAdminSupplier = 
EasyMock.newCapture();
-    private Capture<NewTopic> capturedNewTopic = EasyMock.newCapture();
-    private Capture<Callback<ConsumerRecord<String, byte[]>>> 
capturedConsumedCallback = EasyMock.newCapture();
+    private final Capture<String> capturedTopic = EasyMock.newCapture();
+    private final Capture<Map<String, Object>> capturedProducerProps = 
EasyMock.newCapture();
+    private final Capture<Map<String, Object>> capturedConsumerProps = 
EasyMock.newCapture();
+    private final Capture<Supplier<TopicAdmin>> capturedAdminSupplier = 
EasyMock.newCapture();
+    private final Capture<NewTopic> capturedNewTopic = EasyMock.newCapture();
+    private final Capture<Callback<ConsumerRecord<String, byte[]>>> 
capturedConsumedCallback = EasyMock.newCapture();
+    private final MockTime time = new MockTime();
 
     private long logOffset = 0;
 
@@ -193,7 +203,7 @@ public class KafkaConfigBackingStoreTest {
         configStorage = PowerMock.createPartialMock(
                 KafkaConfigBackingStore.class,
                 new String[]{"createKafkaBasedLog", "createFencableProducer"},
-                converter, config, null, null, CLIENT_ID_BASE);
+                converter, config, null, null, CLIENT_ID_BASE, time);
         Whitebox.setInternalState(configStorage, "configLog", storeLog);
         configStorage.setUpdateListener(configUpdateListener);
         // The mock must be reset and re-mocked for the remainder of the test.
@@ -326,6 +336,92 @@ public class KafkaConfigBackingStoreTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testPutConnectorConfigProducerError() throws Exception {
+        expectConfigure();
+        expectStart(Collections.emptyList(), Collections.emptyMap());
+        expectPartitionCount(1);
+
+        expectConvert(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, 
CONNECTOR_CONFIG_STRUCTS.get(0), CONFIGS_SERIALIZED.get(0));
+
+        storeLog.send(EasyMock.anyObject(), EasyMock.anyObject());
+        EasyMock.expectLastCall().andReturn(producerFuture);
+
+        producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());
+        EasyMock.expectLastCall().andThrow(new ExecutionException(new 
TopicAuthorizationException(Collections.singleton("test"))));
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        configStorage.start();
+
+        // Verify initial state
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(-1, configState.offset());
+        assertEquals(0, configState.connectors().size());
+
+        // verify that the producer exception from KafkaBasedLog::send is 
propagated
+        ConnectException e = assertThrows(ConnectException.class, () -> 
configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0)));
+        assertTrue(e.getMessage().contains("Error writing connector 
configuration to Kafka"));
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testRemoveConnectorConfigSlowProducer() throws Exception {
+        expectConfigure();
+        expectStart(Collections.emptyList(), Collections.emptyMap());
+        expectPartitionCount(1);
+
+        @SuppressWarnings("unchecked")
+        Future<RecordMetadata> connectorConfigProducerFuture = 
PowerMock.createMock(Future.class);
+        // tombstone for the connector config
+        storeLog.send(EasyMock.anyObject(), EasyMock.isNull());
+        EasyMock.expectLastCall().andReturn(connectorConfigProducerFuture);
+
+        @SuppressWarnings("unchecked")
+        Future<RecordMetadata> targetStateProducerFuture = 
PowerMock.createMock(Future.class);
+        // tombstone for the connector target state
+        storeLog.send(EasyMock.anyObject(), EasyMock.isNull());
+        EasyMock.expectLastCall().andReturn(targetStateProducerFuture);
+
+        
connectorConfigProducerFuture.get(EasyMock.eq(READ_WRITE_TOTAL_TIMEOUT_MS), 
EasyMock.anyObject());
+        EasyMock.expectLastCall().andAnswer(() -> {
+            time.sleep(READ_WRITE_TOTAL_TIMEOUT_MS - 1000);
+            return null;
+        });
+
+        // the future get timeout is expected to be reduced according to how 
long the previous Future::get took
+        targetStateProducerFuture.get(EasyMock.eq(1000L), 
EasyMock.anyObject());
+        EasyMock.expectLastCall().andAnswer(() -> {
+            time.sleep(1000);
+            return null;
+        });
+
+        @SuppressWarnings("unchecked")
+        Future<Void> future = PowerMock.createMock(Future.class);
+        EasyMock.expect(storeLog.readToEnd()).andAnswer(() -> future);
+
+        // the Future::get calls on the previous two producer futures 
exhausted the overall timeout; so expect the
+        // timeout on the log read future to be 0
+        EasyMock.expect(future.get(EasyMock.eq(0L), 
EasyMock.anyObject())).andReturn(null);
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        configStorage.start();
+
+        configStorage.removeConnectorConfig("test-connector");
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
     @Test
     public void testWritePrivileges() throws Exception {
         // With exactly.once.source.support = preparing (or also, "enabled"), 
we need to use a transactional producer
@@ -365,7 +461,9 @@ public class KafkaConfigBackingStoreTest {
         // In the meantime, write a target state (which doesn't require write 
privileges)
         expectConvert(KafkaConfigBackingStore.TARGET_STATE_V0, 
TARGET_STATE_PAUSED, CONFIGS_SERIALIZED.get(1));
         storeLog.send("target-state-" + CONNECTOR_IDS.get(1), 
CONFIGS_SERIALIZED.get(1));
-        PowerMock.expectLastCall();
+        EasyMock.expectLastCall().andReturn(producerFuture);
+        producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());
+        EasyMock.expectLastCall().andReturn(null);
 
         // Reclaim write privileges
         expectFencableProducer();
@@ -1497,13 +1595,18 @@ public class KafkaConfigBackingStoreTest {
     // from the log. Validate the data that is captured when the conversion is 
performed matches the specified data
     // (by checking a single field's value)
     private void expectConvertWriteRead(final String configKey, final Schema 
valueSchema, final byte[] serialized,
-                                        final String dataFieldName, final 
Object dataFieldValue) {
+                                        final String dataFieldName, final 
Object dataFieldValue) throws Exception {
         final Capture<Struct> capturedRecord = EasyMock.newCapture();
         if (serialized != null)
             EasyMock.expect(converter.fromConnectData(EasyMock.eq(TOPIC), 
EasyMock.eq(valueSchema), EasyMock.capture(capturedRecord)))
                     .andReturn(serialized);
+
         storeLog.send(EasyMock.eq(configKey), EasyMock.aryEq(serialized));
-        PowerMock.expectLastCall();
+        EasyMock.expectLastCall().andReturn(producerFuture);
+
+        producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());
+        EasyMock.expectLastCall().andReturn(null);
+
         EasyMock.expect(converter.toConnectData(EasyMock.eq(TOPIC), 
EasyMock.aryEq(serialized)))
                 .andAnswer(() -> {
                     if (dataFieldName != null)
@@ -1536,7 +1639,7 @@ public class KafkaConfigBackingStoreTest {
                 });
     }
 
-    private void expectConnectorRemoval(String configKey, String 
targetStateKey) {
+    private void expectConnectorRemoval(String configKey, String 
targetStateKey) throws Exception {
         expectConvertWriteRead(configKey, 
KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, null, null, null);
         expectConvertWriteRead(targetStateKey, 
KafkaConfigBackingStore.TARGET_STATE_V0, null, null, null);
 
@@ -1547,7 +1650,7 @@ public class KafkaConfigBackingStoreTest {
     }
 
     private void expectConvertWriteAndRead(final String configKey, final 
Schema valueSchema, final byte[] serialized,
-                                           final String dataFieldName, final 
Object dataFieldValue) {
+                                           final String dataFieldName, final 
Object dataFieldValue) throws Exception {
         expectConvertWriteRead(configKey, valueSchema, serialized, 
dataFieldName, dataFieldValue);
         LinkedHashMap<String, byte[]> recordsToRead = new LinkedHashMap<>();
         recordsToRead.put(configKey, serialized);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
index 652f4ff4dd7..9b91b55e92f 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
@@ -261,7 +261,7 @@ public class EmbeddedConnectCluster {
         putIfAbsent(workerProps, OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG, 
internalTopicsReplFactor);
         putIfAbsent(workerProps, CONFIG_TOPIC_CONFIG, "connect-config-topic-" 
+ connectClusterName);
         putIfAbsent(workerProps, CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG, 
internalTopicsReplFactor);
-        putIfAbsent(workerProps, STATUS_STORAGE_TOPIC_CONFIG, 
"connect-storage-topic-" + connectClusterName);
+        putIfAbsent(workerProps, STATUS_STORAGE_TOPIC_CONFIG, 
"connect-status-topic-" + connectClusterName);
         putIfAbsent(workerProps, STATUS_STORAGE_REPLICATION_FACTOR_CONFIG, 
internalTopicsReplFactor);
         putIfAbsent(workerProps, KEY_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.storage.StringConverter");
         putIfAbsent(workerProps, VALUE_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.storage.StringConverter");


Reply via email to