yashmayya commented on code in PR #12947: URL: https://github.com/apache/kafka/pull/12947#discussion_r1042468179
########## connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java: ########## @@ -21,41 +21,51 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.runtime.Connect; -import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.Worker; -import org.apache.kafka.connect.runtime.WorkerConfig; -import org.apache.kafka.connect.runtime.WorkerInfo; import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.runtime.rest.RestClient; import org.apache.kafka.connect.runtime.rest.RestServer; -import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.runtime.standalone.StandaloneHerder; import org.apache.kafka.connect.storage.FileOffsetBackingStore; import org.apache.kafka.connect.storage.OffsetBackingStore; -import org.apache.kafka.connect.util.FutureCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.URI; import java.util.Arrays; import java.util.Collections; import java.util.Map; /** * <p> - * Command line utility that runs Kafka Connect as a standalone process. In this mode, work is not - * distributed. Instead, all the normal Connect machinery works within a single process. This is - * useful for ad hoc, small, or experimental jobs. - * </p> - * <p> - * By default, no job configs or offset data is persistent. You can make jobs persistent and - * fault tolerant by overriding the settings to use file storage for both. + * Command line utility that runs Kafka Connect as a standalone process. In this mode, work (connectors and tasks) is not + * distributed. Instead, all the normal Connect machinery works within a single process. This is useful for for development + * and testing Kafka Connect on a local machine. * </p> */ -public class ConnectStandalone { +public class ConnectStandalone extends AbstractConnectCli<StandaloneConfig> { private static final Logger log = LoggerFactory.getLogger(ConnectStandalone.class); + @Override + protected Herder createHerder(StandaloneConfig config, String workerId, Plugins plugins, + ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, + RestServer restServer, RestClient restClient) { + + OffsetBackingStore offsetBackingStore = new FileOffsetBackingStore(); + offsetBackingStore.configure(config); + + Worker worker = new Worker(workerId, Time.SYSTEM, plugins, config, offsetBackingStore, + connectorClientConfigOverridePolicy); + + return new StandaloneHerder(worker, config.kafkaClusterId(), connectorClientConfigOverridePolicy); + } + + @Override + protected StandaloneConfig createConfig(Map<String, String> workerProps) { + return new StandaloneConfig(workerProps); + } + public static void main(String[] args) { Review Comment: While I agree that they both do look pretty similar, I'm not sure there's a way to refactor them to re-use common code while still being clean / readable. Were you thinking something along the lines of a static helper method which takes an argument indicating whether the mode is standalone or distributed (perhaps even a `Class<>` instance)? Although this might be messy too in case we don't want to keep the connector creation logic in the base `startConnect` method and need to move it back to `ConnectStandalone`'s startup logic (https://github.com/apache/kafka/pull/12947/files#r1039705222). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org