This is an automated email from the ASF dual-hosted git repository.
suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 0457c71d039 Fix k8sAndWorker mode in a zookeeper-less environment
(#15445)
0457c71d039 is described below
commit 0457c71d0399904148f581d62d92099670d615bd
Author: YongGang <[email protected]>
AuthorDate: Fri Jan 12 09:30:01 2024 -0800
Fix k8sAndWorker mode in a zookeeper-less environment (#15445)
* Fix k8sAndWorker mode in a zookeeper-less environment
* add unit test
* code reformat
* minor refine
* change to inject Provider
* correct style
* bind HttpRemoteTaskRunnerFactory as provider
* change to bind on TaskRunnerFactory
* fix styling
---
.../KubernetesAndWorkerTaskRunnerFactory.java | 24 +---
.../k8s/overlord/KubernetesOverlordModule.java | 25 ++++
.../KubernetesAndWorkerTaskRunnerFactoryTest.java | 55 +--------
.../k8s/overlord/KubernetesOverlordModuleTest.java | 130 +++++++++++++++++++++
4 files changed, 166 insertions(+), 68 deletions(-)
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactory.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactory.java
index de6db915c8a..9b482e75dc9 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactory.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactory.java
@@ -20,10 +20,9 @@
package org.apache.druid.k8s.overlord;
import com.google.inject.Inject;
-import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
+import com.google.inject.name.Named;
import org.apache.druid.indexing.overlord.TaskRunnerFactory;
import org.apache.druid.indexing.overlord.WorkerTaskRunner;
-import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
import org.apache.druid.k8s.overlord.runnerstrategy.RunnerStrategy;
@@ -32,9 +31,7 @@ public class KubernetesAndWorkerTaskRunnerFactory implements
TaskRunnerFactory<K
public static final String TYPE_NAME = "k8sAndWorker";
private final KubernetesTaskRunnerFactory kubernetesTaskRunnerFactory;
- private final HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory;
- private final RemoteTaskRunnerFactory remoteTaskRunnerFactory;
- private final KubernetesAndWorkerTaskRunnerConfig
kubernetesAndWorkerTaskRunnerConfig;
+ private final TaskRunnerFactory<? extends WorkerTaskRunner>
taskRunnerFactory;
private final RunnerStrategy runnerStrategy;
private KubernetesAndWorkerTaskRunner runner;
@@ -42,16 +39,12 @@ public class KubernetesAndWorkerTaskRunnerFactory
implements TaskRunnerFactory<K
@Inject
public KubernetesAndWorkerTaskRunnerFactory(
KubernetesTaskRunnerFactory kubernetesTaskRunnerFactory,
- HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory,
- RemoteTaskRunnerFactory remoteTaskRunnerFactory,
- KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig,
+ @Named("taskRunnerFactory") TaskRunnerFactory<? extends
WorkerTaskRunner> taskRunnerFactory,
RunnerStrategy runnerStrategy
)
{
this.kubernetesTaskRunnerFactory = kubernetesTaskRunnerFactory;
- this.httpRemoteTaskRunnerFactory = httpRemoteTaskRunnerFactory;
- this.remoteTaskRunnerFactory = remoteTaskRunnerFactory;
- this.kubernetesAndWorkerTaskRunnerConfig =
kubernetesAndWorkerTaskRunnerConfig;
+ this.taskRunnerFactory = taskRunnerFactory;
this.runnerStrategy = runnerStrategy;
}
@@ -60,19 +53,12 @@ public class KubernetesAndWorkerTaskRunnerFactory
implements TaskRunnerFactory<K
{
runner = new KubernetesAndWorkerTaskRunner(
kubernetesTaskRunnerFactory.build(),
- getWorkerTaskRunner(),
+ taskRunnerFactory.build(),
runnerStrategy
);
return runner;
}
- private WorkerTaskRunner getWorkerTaskRunner()
- {
- String workerType = kubernetesAndWorkerTaskRunnerConfig.getWorkerType();
- return HttpRemoteTaskRunnerFactory.TYPE_NAME.equals(workerType) ?
- httpRemoteTaskRunnerFactory.build() :
remoteTaskRunnerFactory.build();
- }
-
@Override
public KubernetesAndWorkerTaskRunner get()
{
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
index afd9d9a7c4e..1e52a3583b2 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
@@ -21,10 +21,12 @@ package org.apache.druid.k8s.overlord;
import com.google.inject.Binder;
import com.google.inject.Inject;
+import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Provider;
import com.google.inject.Provides;
import com.google.inject.multibindings.MapBinder;
+import com.google.inject.name.Named;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import org.apache.druid.discovery.NodeRole;
@@ -37,8 +39,11 @@ import org.apache.druid.guice.PolyBind;
import org.apache.druid.guice.annotations.LoadScope;
import org.apache.druid.indexing.common.config.FileTaskLogsConfig;
import org.apache.druid.indexing.common.tasklogs.FileTaskLogs;
+import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
import org.apache.druid.indexing.overlord.TaskRunnerFactory;
+import org.apache.druid.indexing.overlord.WorkerTaskRunner;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
+import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
@@ -130,6 +135,26 @@ public class KubernetesOverlordModule implements
DruidModule
return client;
}
+ /**
+ * Provides a TaskRunnerFactory instance suitable for environments without
Zookeeper.
+ * In such environments, the standard RemoteTaskRunnerFactory may not be
operational.
+ * Depending on the workerType defined in
KubernetesAndWorkerTaskRunnerConfig,
+ * this method selects and returns an appropriate TaskRunnerFactory
implementation.
+ */
+ @Provides
+ @LazySingleton
+ @Named("taskRunnerFactory")
+ TaskRunnerFactory<? extends WorkerTaskRunner> provideWorkerTaskRunner(
+ KubernetesAndWorkerTaskRunnerConfig runnerConfig,
+ Injector injector
+ )
+ {
+ String workerType = runnerConfig.getWorkerType();
+ return HttpRemoteTaskRunnerFactory.TYPE_NAME.equals(workerType)
+ ? injector.getInstance(HttpRemoteTaskRunnerFactory.class)
+ : injector.getInstance(RemoteTaskRunnerFactory.class);
+ }
+
private static class RunnerStrategyProvider implements
Provider<RunnerStrategy>
{
private KubernetesAndWorkerTaskRunnerConfig runnerConfig;
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactoryTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactoryTest.java
index 88696017d05..b309d4ef04b 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactoryTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactoryTest.java
@@ -20,9 +20,8 @@
package org.apache.druid.k8s.overlord;
-import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
-import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
-import org.apache.druid.k8s.overlord.runnerstrategy.KubernetesRunnerStrategy;
+import org.apache.druid.indexing.overlord.TaskRunnerFactory;
+import org.apache.druid.indexing.overlord.WorkerTaskRunner;
import org.apache.druid.k8s.overlord.runnerstrategy.WorkerRunnerStrategy;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
@@ -37,59 +36,17 @@ public class KubernetesAndWorkerTaskRunnerFactoryTest
extends EasyMockSupport
{
@Mock KubernetesTaskRunnerFactory kubernetesTaskRunnerFactory;
- @Mock HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory;
- @Mock RemoteTaskRunnerFactory remoteTaskRunnerFactory;
+ @Mock TaskRunnerFactory<? extends WorkerTaskRunner> taskRunnerFactory;
@Test
- public void test_useHttpTaskRunner_asDefault()
+ public void test_buildKubernetesTaskRunnerSuccessfully()
{
KubernetesAndWorkerTaskRunnerFactory factory = new
KubernetesAndWorkerTaskRunnerFactory(
kubernetesTaskRunnerFactory,
- httpRemoteTaskRunnerFactory,
- remoteTaskRunnerFactory,
- new KubernetesAndWorkerTaskRunnerConfig(null, null),
+ taskRunnerFactory,
new WorkerRunnerStrategy()
);
-
- EasyMock.expect(httpRemoteTaskRunnerFactory.build()).andReturn(null);
- EasyMock.expect(kubernetesTaskRunnerFactory.build()).andReturn(null);
-
- replayAll();
- factory.build();
- verifyAll();
- }
-
- @Test
- public void test_specifyRemoteTaskRunner()
- {
- KubernetesAndWorkerTaskRunnerFactory factory = new
KubernetesAndWorkerTaskRunnerFactory(
- kubernetesTaskRunnerFactory,
- httpRemoteTaskRunnerFactory,
- remoteTaskRunnerFactory,
- new KubernetesAndWorkerTaskRunnerConfig(null, "remote"),
- new WorkerRunnerStrategy()
- );
-
- EasyMock.expect(remoteTaskRunnerFactory.build()).andReturn(null);
- EasyMock.expect(kubernetesTaskRunnerFactory.build()).andReturn(null);
-
- replayAll();
- factory.build();
- verifyAll();
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void test_specifyIncorrectTaskRunner_shouldThrowException()
- {
- KubernetesAndWorkerTaskRunnerFactory factory = new
KubernetesAndWorkerTaskRunnerFactory(
- kubernetesTaskRunnerFactory,
- httpRemoteTaskRunnerFactory,
- remoteTaskRunnerFactory,
- new KubernetesAndWorkerTaskRunnerConfig(null, "noop"),
- new KubernetesRunnerStrategy()
- );
-
- EasyMock.expect(remoteTaskRunnerFactory.build()).andReturn(null);
+ EasyMock.expect(taskRunnerFactory.build()).andReturn(null);
EasyMock.expect(kubernetesTaskRunnerFactory.build()).andReturn(null);
replayAll();
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
new file mode 100644
index 00000000000..b83ec562bda
--- /dev/null
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord;
+
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.ProvisionException;
+import org.apache.druid.guice.ConfigModule;
+import org.apache.druid.guice.DruidGuiceExtensions;
+import org.apache.druid.guice.annotations.EscalatedGlobal;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
+import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
+import org.apache.druid.jackson.JacksonModule;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.server.DruidNode;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.util.Properties;
+
+@RunWith(EasyMockRunner.class)
+public class KubernetesOverlordModuleTest
+{
+ @Mock
+ private ServiceEmitter serviceEmitter;
+ @Mock
+ private TaskConfig taskConfig;
+ @Mock
+ private HttpClient httpClient;
+ @Mock
+ private RemoteTaskRunnerFactory remoteTaskRunnerFactory;
+ @Mock
+ private HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory;
+ private Injector injector;
+
+ @Test
+ public void testDefaultHttpRemoteTaskRunnerFactoryBindSuccessfully()
+ {
+ injector = makeInjectorWithProperties(initializePropertes(false), false,
true);
+ KubernetesAndWorkerTaskRunnerFactory taskRunnerFactory =
injector.getInstance(
+ KubernetesAndWorkerTaskRunnerFactory.class);
+ Assert.assertNotNull(taskRunnerFactory);
+
+ Assert.assertNotNull(taskRunnerFactory.build());
+ }
+
+ @Test
+ public void testRemoteTaskRunnerFactoryBindSuccessfully()
+ {
+ injector = makeInjectorWithProperties(initializePropertes(true), true,
false);
+ KubernetesAndWorkerTaskRunnerFactory taskRunnerFactory =
injector.getInstance(
+ KubernetesAndWorkerTaskRunnerFactory.class);
+ Assert.assertNotNull(taskRunnerFactory);
+
+ Assert.assertNotNull(taskRunnerFactory.build());
+ }
+
+ @Test(expected = ProvisionException.class)
+ public void testExceptionThrownIfNoTaskRunnerFactoryBind()
+ {
+ injector = makeInjectorWithProperties(initializePropertes(false), false,
false);
+ injector.getInstance(KubernetesAndWorkerTaskRunnerFactory.class);
+ }
+
+ private Injector makeInjectorWithProperties(
+ final Properties props,
+ boolean isWorkerTypeRemote,
+ boolean isWorkerTypeHttpRemote
+ )
+ {
+ return Guice.createInjector(
+ ImmutableList.of(
+ new DruidGuiceExtensions(),
+ new JacksonModule(),
+
+ binder -> {
+ binder.bind(Properties.class).toInstance(props);
+ binder.bind(ServiceEmitter.class).toInstance(serviceEmitter);
+
binder.bind(HttpClient.class).annotatedWith(EscalatedGlobal.class).toInstance(httpClient);
+ binder.bind(TaskConfig.class).toInstance(taskConfig);
+ binder.bind(DruidNode.class)
+ .annotatedWith(Self.class)
+ .toInstance(new DruidNode("test-inject", null, false,
null, null, true, false));
+ if (isWorkerTypeRemote) {
+
binder.bind(RemoteTaskRunnerFactory.class).toInstance(remoteTaskRunnerFactory);
+ }
+ if (isWorkerTypeHttpRemote) {
+
binder.bind(HttpRemoteTaskRunnerFactory.class).toInstance(httpRemoteTaskRunnerFactory);
+ }
+ },
+ new ConfigModule(),
+ new KubernetesOverlordModule()
+ ));
+ }
+
+ private static Properties initializePropertes(boolean isWorkerTypeRemote)
+ {
+ final Properties props = new Properties();
+ props.put("druid.indexer.runner.namespace", "NAMESPACE");
+ props.put("druid.indexer.runner.k8sAndWorker.runnerStrategy.type", "k8s");
+ if (isWorkerTypeRemote) {
+ props.put("druid.indexer.runner.k8sAndWorker.runnerStrategy.workerType",
"remote");
+ }
+ return props;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]