This is an automated email from the ASF dual-hosted git repository.
suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 14940dc3ed Add pod name to TaskLocation for easier observability and
debugging. (#14758)
14940dc3ed is described below
commit 14940dc3ede3ca5430d55b7764836b559ac4c500
Author: George Shiqi Wu <[email protected]>
AuthorDate: Mon Aug 7 15:33:35 2023 -0400
Add pod name to TaskLocation for easier observability and debugging.
(#14758)
* Add pod name to location
* Add log
* fix style
* Update
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
Co-authored-by: Suneet Saldanha <[email protected]>
* Fix unit tests
---------
Co-authored-by: Suneet Saldanha <[email protected]>
---
.../k8s/overlord/KubernetesPeonLifecycle.java | 4 +-
.../k8s/overlord/KubernetesPeonLifecycleTest.java | 3 ++
.../kafka/supervisor/KafkaSupervisorTest.java | 50 +++++++++++-----------
.../kinesis/supervisor/KinesisSupervisorTest.java | 36 ++++++++--------
.../sql/resources/SqlStatementResourceTest.java | 8 ++--
.../indexing/overlord/TaskRunnerUtilsTest.java | 2 +-
.../druid/indexing/overlord/http/OverlordTest.java | 2 +-
.../org/apache/druid/indexer/TaskLocation.java | 32 +++++++++++---
.../org/apache/druid/indexer/TaskLocationTest.java | 24 +++++++++--
.../org/apache/druid/indexer/TaskStatusTest.java | 2 +-
10 files changed, 102 insertions(+), 61 deletions(-)
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
index 302a568a23..447a8632bb 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
@@ -245,8 +245,10 @@ public class KubernetesPeonLifecycle
podStatus.getPodIP(),
DruidK8sConstants.PORT,
DruidK8sConstants.TLS_PORT,
-
Boolean.parseBoolean(pod.getMetadata().getAnnotations().getOrDefault(DruidK8sConstants.TLS_ENABLED,
"false"))
+
Boolean.parseBoolean(pod.getMetadata().getAnnotations().getOrDefault(DruidK8sConstants.TLS_ENABLED,
"false")),
+ pod.getMetadata() != null ? pod.getMetadata().getName() : ""
);
+ log.info("K8s task %s is running at location %s",
taskId.getOriginalTaskId(), taskLocation);
}
return taskLocation;
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
index 1ec726f3fa..980a425a85 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
@@ -815,6 +815,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
Assert.assertEquals("ip", location.getHost());
Assert.assertEquals(8100, location.getPort());
Assert.assertEquals(-1, location.getTlsPort());
+ Assert.assertEquals(ID, location.getK8sPodName());
verifyAll();
}
@@ -850,6 +851,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
Assert.assertEquals("ip", location.getHost());
Assert.assertEquals(8100, location.getPort());
Assert.assertEquals(-1, location.getTlsPort());
+ Assert.assertEquals(ID, location.getK8sPodName());
verifyAll();
}
@@ -886,6 +888,7 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
Assert.assertEquals("ip", location.getHost());
Assert.assertEquals(-1, location.getPort());
Assert.assertEquals(8091, location.getTlsPort());
+ Assert.assertEquals(ID, location.getK8sPodName());
verifyAll();
}
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 171883103f..3bf3a75e30 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -1431,7 +1431,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testBeginPublishAndQueueNextTasks() throws Exception
{
- final TaskLocation location = new TaskLocation("testHost", 1234, -1);
+ final TaskLocation location = TaskLocation.create("testHost", 1234, -1);
supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null);
final KafkaSupervisorTuningConfig tuningConfig =
supervisor.getTuningConfig();
@@ -1526,7 +1526,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testDiscoverExistingPublishingTask() throws Exception
{
- final TaskLocation location = new TaskLocation("testHost", 1234, -1);
+ final TaskLocation location = TaskLocation.create("testHost", 1234, -1);
supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
final KafkaSupervisorTuningConfig tuningConfig =
supervisor.getTuningConfig();
@@ -1646,7 +1646,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void
testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation() throws
Exception
{
- final TaskLocation location = new TaskLocation("testHost", 1234, -1);
+ final TaskLocation location = TaskLocation.create("testHost", 1234, -1);
supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
final KafkaSupervisorTuningConfig tuningConfig =
supervisor.getTuningConfig();
@@ -1757,8 +1757,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testDiscoverExistingPublishingAndReadingTask() throws Exception
{
- final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
- final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
+ final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
+ final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1);
final DateTime startTime = DateTimes.nowUtc();
supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
@@ -1876,8 +1876,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testReportWhenMultipleActiveTasks() throws Exception
{
- final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
- final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
+ final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
+ final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1);
supervisor = getTestableSupervisorForIdleBehaviour(1, 2, true, "PT10S",
null, null, false, null);
@@ -2034,8 +2034,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
addSomeEvents(100);
- final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
- final TaskLocation location2 = new TaskLocation("testHost", 234, -1);
+ final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
+ final TaskLocation location2 = TaskLocation.create("testHost", 234, -1);
Task id1 = createKafkaIndexTask(
"id1",
@@ -2339,8 +2339,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
addSomeEvents(100);
- final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
- final TaskLocation location2 = new TaskLocation("testHost", 234, -1);
+ final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
+ final TaskLocation location2 = TaskLocation.create("testHost", 234, -1);
Task id1 = createKafkaIndexTask(
"id1",
@@ -2548,7 +2548,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testKillUnresponsiveTasksWhilePausing() throws Exception
{
- final TaskLocation location = new TaskLocation("testHost", 1234, -1);
+ final TaskLocation location = TaskLocation.create("testHost", 1234, -1);
supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null);
addSomeEvents(100);
@@ -2634,7 +2634,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testKillUnresponsiveTasksWhileSettingEndOffsets() throws
Exception
{
- final TaskLocation location = new TaskLocation("testHost", 1234, -1);
+ final TaskLocation location = TaskLocation.create("testHost", 1234, -1);
supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null);
addSomeEvents(100);
@@ -2749,8 +2749,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testStopGracefully() throws Exception
{
- final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
- final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
+ final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
+ final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1);
final DateTime startTime = DateTimes.nowUtc();
supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null);
@@ -3017,8 +3017,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testResetRunningTasks() throws Exception
{
- final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
- final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
+ final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
+ final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1);
final DateTime startTime = DateTimes.nowUtc();
supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null);
@@ -3277,8 +3277,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
tuningConfig
);
- final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
- final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
+ final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
+ final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1);
Collection workItems = new ArrayList<>();
workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));
workItems.add(new TestTaskRunnerWorkItem(id2, null, location2));
@@ -3472,8 +3472,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
{
// graceful shutdown is expected to be called on running tasks since state
is suspended
- final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
- final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
+ final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
+ final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1);
final DateTime startTime = DateTimes.nowUtc();
supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null, true,
kafkaHost);
@@ -4221,10 +4221,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
List<Task> tasks = ImmutableList.of(readingTask, publishingTask,
pausedTask, failsToResumePausedTask, waitingTask, pendingTask);
Collection taskRunnerWorkItems = ImmutableList.of(
- new TestTaskRunnerWorkItem(readingTask, null, new
TaskLocation("testHost", 1001, -1)),
- new TestTaskRunnerWorkItem(publishingTask, null, new
TaskLocation("testHost", 1002, -1)),
- new TestTaskRunnerWorkItem(pausedTask, null, new
TaskLocation("testHost", 1003, -1)),
- new TestTaskRunnerWorkItem(failsToResumePausedTask, null, new
TaskLocation("testHost", 1004, -1))
+ new TestTaskRunnerWorkItem(readingTask, null,
TaskLocation.create("testHost", 1001, -1)),
+ new TestTaskRunnerWorkItem(publishingTask, null,
TaskLocation.create("testHost", 1002, -1)),
+ new TestTaskRunnerWorkItem(pausedTask, null,
TaskLocation.create("testHost", 1003, -1)),
+ new TestTaskRunnerWorkItem(failsToResumePausedTask, null,
TaskLocation.create("testHost", 1004, -1))
);
DateTime startTime = DateTimes.nowUtc();
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 5a3295e0bf..e489061408 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -1475,7 +1475,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
@Test
public void testBeginPublishAndQueueNextTasks() throws Exception
{
- final TaskLocation location = new TaskLocation("testHost", 1234, -1);
+ final TaskLocation location = TaskLocation.create("testHost", 1234, -1);
supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null);
@@ -1604,7 +1604,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
@Test
public void testDiscoverExistingPublishingTask() throws Exception
{
- final TaskLocation location = new TaskLocation("testHost", 1234, -1);
+ final TaskLocation location = TaskLocation.create("testHost", 1234, -1);
final Map<String, Long> timeLag = ImmutableMap.of(SHARD_ID1, 0L,
SHARD_ID0, 20000000L);
supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
@@ -1766,7 +1766,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
@Test
public void
testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation() throws
Exception
{
- final TaskLocation location = new TaskLocation("testHost", 1234, -1);
+ final TaskLocation location = TaskLocation.create("testHost", 1234, -1);
final Map<String, Long> timeLag = ImmutableMap.of(SHARD_ID1, 9000L,
SHARD_ID0, 1234L);
supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
@@ -1916,8 +1916,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
@Test
public void testDiscoverExistingPublishingAndReadingTask() throws Exception
{
- final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
- final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
+ final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
+ final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1);
final DateTime startTime = DateTimes.nowUtc();
final Map<String, Long> timeLag = ImmutableMap.of(SHARD_ID0, 100L,
SHARD_ID1, 200L);
@@ -2180,7 +2180,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
@Test
public void testKillUnresponsiveTasksWhilePausing() throws Exception
{
- final TaskLocation location = new TaskLocation("testHost", 1234, -1);
+ final TaskLocation location = TaskLocation.create("testHost", 1234, -1);
supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null);
supervisorRecordSupplier.assign(EasyMock.anyObject());
@@ -2288,7 +2288,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
@Test
public void testKillUnresponsiveTasksWhileSettingEndOffsets() throws
Exception
{
- final TaskLocation location = new TaskLocation("testHost", 1234, -1);
+ final TaskLocation location = TaskLocation.create("testHost", 1234, -1);
supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null);
supervisorRecordSupplier.assign(EasyMock.anyObject());
@@ -2434,8 +2434,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
@Test
public void testStopGracefully() throws Exception
{
- final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
- final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
+ final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
+ final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1);
final DateTime startTime = DateTimes.nowUtc();
supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null);
@@ -2855,8 +2855,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
@Test
public void testResetRunningTasks() throws Exception
{
- final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
- final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
+ final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
+ final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1);
final DateTime startTime = DateTimes.nowUtc();
supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null);
@@ -3105,9 +3105,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
final Collection workItems = new ArrayList();
- workItems.add(new TestTaskRunnerWorkItem(id1, null, new
TaskLocation(id1.getId(), 8100, 8100)));
- workItems.add(new TestTaskRunnerWorkItem(id2, null, new
TaskLocation(id2.getId(), 8100, 8100)));
- workItems.add(new TestTaskRunnerWorkItem(id3, null, new
TaskLocation(id3.getId(), 8100, 8100)));
+ workItems.add(new TestTaskRunnerWorkItem(id1, null,
TaskLocation.create(id1.getId(), 8100, 8100)));
+ workItems.add(new TestTaskRunnerWorkItem(id2, null,
TaskLocation.create(id2.getId(), 8100, 8100)));
+ workItems.add(new TestTaskRunnerWorkItem(id3, null,
TaskLocation.create(id3.getId(), 8100, 8100)));
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems);
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1,
id2, id3)).anyTimes();
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
@@ -3239,8 +3239,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
null
);
- final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
- final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
+ final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
+ final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1);
Collection workItems = new ArrayList<>();
workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));
workItems.add(new TestTaskRunnerWorkItem(id2, null, location2));
@@ -3496,8 +3496,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
{
// graceful shutdown is expected to be called on running tasks since state
is suspended
- final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
- final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
+ final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
+ final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1);
final DateTime startTime = DateTimes.nowUtc();
supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null, true);
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
index fa4d3c9f40..d7f04d8277 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
@@ -381,7 +381,7 @@ public class SqlStatementResourceTest extends MSQTestBase
TaskState.RUNNING,
null,
null,
- new TaskLocation("test", 0, 0),
+ TaskLocation.create("test", 0, 0),
null,
null
))));
@@ -403,7 +403,7 @@ public class SqlStatementResourceTest extends MSQTestBase
TaskState.SUCCESS,
null,
100L,
- new TaskLocation("test", 0, 0),
+ TaskLocation.create("test", 0, 0),
null,
null
))));
@@ -527,7 +527,7 @@ public class SqlStatementResourceTest extends MSQTestBase
TaskState.RUNNING,
null,
null,
- new TaskLocation("test", 0, 0),
+ TaskLocation.create("test", 0, 0),
null,
null
))));
@@ -549,7 +549,7 @@ public class SqlStatementResourceTest extends MSQTestBase
TaskState.SUCCESS,
null,
100L,
- new TaskLocation("test", 0, 0),
+ TaskLocation.create("test", 0, 0),
null,
null
))));
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java
index 529bafd15f..820b2e893c 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java
@@ -46,7 +46,7 @@ public class TaskRunnerUtilsTest
public void testMakeTaskLocationURL()
{
final URL url = TaskRunnerUtils.makeTaskLocationURL(
- new TaskLocation("1.2.3.4", 8090, 8290),
+ TaskLocation.create("1.2.3.4", 8090, 8290),
"/druid/worker/v1/task/%s/log",
"foo bar&"
);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
index 0b9c77ef66..3f8c1a9870 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
@@ -98,7 +98,7 @@ import java.util.concurrent.Executor;
public class OverlordTest
{
- private static final TaskLocation TASK_LOCATION = new TaskLocation("dummy",
1000, -1);
+ private static final TaskLocation TASK_LOCATION =
TaskLocation.create("dummy", 1000, -1);
private TestingServer server;
private Timing timing;
diff --git
a/processing/src/main/java/org/apache/druid/indexer/TaskLocation.java
b/processing/src/main/java/org/apache/druid/indexer/TaskLocation.java
index fe8accd33d..21e2006211 100644
--- a/processing/src/main/java/org/apache/druid/indexer/TaskLocation.java
+++ b/processing/src/main/java/org/apache/druid/indexer/TaskLocation.java
@@ -20,6 +20,7 @@
package org.apache.druid.indexer;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.IAE;
@@ -30,21 +31,29 @@ import java.util.Objects;
public class TaskLocation
{
- private static final TaskLocation UNKNOWN = new TaskLocation(null, -1, -1);
+ private static final TaskLocation UNKNOWN = new TaskLocation(null, -1, -1,
null);
@Nullable
private final String host;
private final int port;
private final int tlsPort;
+ @Nullable
+ private final String k8sPodName;
+
public static TaskLocation create(String host, int port, int tlsPort)
{
- return new TaskLocation(host, port, tlsPort);
+ return new TaskLocation(host, port, tlsPort, null);
}
public static TaskLocation create(String host, int port, int tlsPort,
boolean isTls)
{
- return isTls ? new TaskLocation(host, -1, tlsPort) : new
TaskLocation(host, port, -1);
+ return create(host, port, tlsPort, isTls, null);
+ }
+
+ public static TaskLocation create(String host, int port, int tlsPort,
boolean isTls, @Nullable String k8sPodName)
+ {
+ return isTls ? new TaskLocation(host, -1, tlsPort, k8sPodName) : new
TaskLocation(host, port, -1, k8sPodName);
}
public static TaskLocation unknown()
@@ -56,12 +65,14 @@ public class TaskLocation
public TaskLocation(
@JsonProperty("host") @Nullable String host,
@JsonProperty("port") int port,
- @JsonProperty("tlsPort") int tlsPort
+ @JsonProperty("tlsPort") int tlsPort,
+ @JsonProperty("k8sPodName") @Nullable String k8sPodName
)
{
this.host = host;
this.port = port;
this.tlsPort = tlsPort;
+ this.k8sPodName = k8sPodName;
}
@Nullable
@@ -83,6 +94,14 @@ public class TaskLocation
return tlsPort;
}
+ @Nullable
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @JsonProperty
+ public String getK8sPodName()
+ {
+ return k8sPodName;
+ }
+
public URL makeURL(final String encodedPathAndQueryString) throws
MalformedURLException
{
final String scheme;
@@ -111,6 +130,7 @@ public class TaskLocation
"host='" + host + '\'' +
", port=" + port +
", tlsPort=" + tlsPort +
+ ", k8sPodName=" + k8sPodName +
'}';
}
@@ -124,12 +144,12 @@ public class TaskLocation
return false;
}
TaskLocation that = (TaskLocation) o;
- return port == that.port && tlsPort == that.tlsPort &&
Objects.equals(host, that.host);
+ return port == that.port && tlsPort == that.tlsPort &&
Objects.equals(host, that.host) && Objects.equals(k8sPodName, that.k8sPodName);
}
@Override
public int hashCode()
{
- return Objects.hash(host, port, tlsPort);
+ return Objects.hash(host, port, tlsPort, k8sPodName);
}
}
diff --git
a/processing/src/test/java/org/apache/druid/indexer/TaskLocationTest.java
b/processing/src/test/java/org/apache/druid/indexer/TaskLocationTest.java
index 1822915fea..03a751c5dd 100644
--- a/processing/src/test/java/org/apache/druid/indexer/TaskLocationTest.java
+++ b/processing/src/test/java/org/apache/druid/indexer/TaskLocationTest.java
@@ -33,13 +33,13 @@ public class TaskLocationTest
@SuppressWarnings("HttpUrlsUsage")
public void testMakeURL() throws MalformedURLException
{
- Assert.assertEquals(new URL("http://abc:80/foo"), new TaskLocation("abc",
80, 0).makeURL("/foo"));
- Assert.assertEquals(new URL("http://abc:80/foo"), new TaskLocation("abc",
80, -1).makeURL("/foo"));
- Assert.assertEquals(new URL("https://abc:443/foo"), new
TaskLocation("abc", 80, 443).makeURL("/foo"));
+ Assert.assertEquals(new URL("http://abc:80/foo"), new TaskLocation("abc",
80, 0, null).makeURL("/foo"));
+ Assert.assertEquals(new URL("http://abc:80/foo"), new TaskLocation("abc",
80, -1, null).makeURL("/foo"));
+ Assert.assertEquals(new URL("https://abc:443/foo"), new
TaskLocation("abc", 80, 443, null).makeURL("/foo"));
Assert.assertThrows(
"URL that does not start with '/'",
IllegalArgumentException.class,
- () -> new TaskLocation("abc", 80, 443).makeURL("foo")
+ () -> new TaskLocation("abc", 80, 443, null).makeURL("foo")
);
}
@@ -54,6 +54,22 @@ public class TaskLocationTest
Assert.assertEquals(2, tls.getTlsPort());
}
+ @Test
+ public void testDefaultK8sJobName()
+ {
+ TaskLocation noK8sJobName = TaskLocation.create("foo", 1, 2, false);
+ Assert.assertNull(noK8sJobName.getK8sPodName());
+ noK8sJobName = TaskLocation.create("foo", 1, 2);
+ Assert.assertNull(noK8sJobName.getK8sPodName());
+ }
+
+ @Test
+ public void testK8sJobNameSet()
+ {
+ TaskLocation k8sJobName = TaskLocation.create("foo", 1, 2, false,
"job-name");
+ Assert.assertEquals("job-name", k8sJobName.getK8sPodName());
+ }
+
@Test
public void testEqualsAndHashCode()
{
diff --git
a/processing/src/test/java/org/apache/druid/indexer/TaskStatusTest.java
b/processing/src/test/java/org/apache/druid/indexer/TaskStatusTest.java
index 939d9f04d3..d0cf6b3d2c 100644
--- a/processing/src/test/java/org/apache/druid/indexer/TaskStatusTest.java
+++ b/processing/src/test/java/org/apache/druid/indexer/TaskStatusTest.java
@@ -59,7 +59,7 @@ public class TaskStatusTest
);
Assert.assertEquals(statusNoLocation, mapper.readValue(jsonNoLocation,
TaskStatus.class));
- TaskStatus success = TaskStatus.success("forkTaskID", new
TaskLocation("localhost", 0, 1));
+ TaskStatus success = TaskStatus.success("forkTaskID",
TaskLocation.create("localhost", 0, 1));
Assert.assertEquals(success.getLocation().getHost(), "localhost");
Assert.assertEquals(success.getLocation().getPort(), 0);
Assert.assertEquals(success.getLocation().getTlsPort(), 1);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]