This is an automated email from the ASF dual-hosted git repository.
suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 757603a773 Set task location as k8sPodName for mm-less ingestion
(#14959)
757603a773 is described below
commit 757603a773d4459a8a2be2ba74d972d70eadb3f6
Author: Suneet Saldanha <[email protected]>
AuthorDate: Mon Sep 11 19:44:26 2023 -0700
Set task location as k8sPodName for mm-less ingestion (#14959)
* Set task location as k8sPodName for mm-less ingestion
* tests
---
.../org/apache/druid/indexer/TaskLocation.java | 21 +++++++++++
.../org/apache/druid/indexer/TaskLocationTest.java | 43 ++++++++++++++++++++++
.../druid/sql/calcite/schema/SystemSchema.java | 19 +---------
3 files changed, 66 insertions(+), 17 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/indexer/TaskLocation.java
b/processing/src/main/java/org/apache/druid/indexer/TaskLocation.java
index 21e2006211..2992814de4 100644
--- a/processing/src/main/java/org/apache/druid/indexer/TaskLocation.java
+++ b/processing/src/main/java/org/apache/druid/indexer/TaskLocation.java
@@ -20,8 +20,10 @@
package org.apache.druid.indexer;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.net.HostAndPort;
import org.apache.druid.java.util.common.IAE;
import javax.annotation.Nullable;
@@ -102,6 +104,25 @@ public class TaskLocation
return k8sPodName;
}
+ @JsonIgnore
+ @Nullable
+ public String getLocation()
+ {
+ if (k8sPodName != null) {
+ return k8sPodName;
+ } else if (host == null) {
+ return null;
+ } else {
+ final int thePort;
+ if (tlsPort >= 0) {
+ thePort = tlsPort;
+ } else {
+ thePort = port;
+ }
+ return HostAndPort.fromParts(host, thePort).toString();
+ }
+ }
+
public URL makeURL(final String encodedPathAndQueryString) throws
MalformedURLException
{
final String scheme;
diff --git
a/processing/src/test/java/org/apache/druid/indexer/TaskLocationTest.java
b/processing/src/test/java/org/apache/druid/indexer/TaskLocationTest.java
index 03a751c5dd..f7bad61d67 100644
--- a/processing/src/test/java/org/apache/druid/indexer/TaskLocationTest.java
+++ b/processing/src/test/java/org/apache/druid/indexer/TaskLocationTest.java
@@ -20,6 +20,7 @@
package org.apache.druid.indexer;
+import com.google.common.net.HostAndPort;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.junit.Assert;
import org.junit.Test;
@@ -75,4 +76,46 @@ public class TaskLocationTest
{
EqualsVerifier.forClass(TaskLocation.class).usingGetClass().verify();
}
+
+ @Test
+ public void testGetLocationWithK8sPodNameShouldReturnK8sPodName()
+ {
+ TaskLocation taskLocation = TaskLocation.create("foo", 1, 2, false,
"job-name");
+ Assert.assertEquals("job-name", taskLocation.getLocation());
+ }
+
+ @Test
+ public void testGetLocationWithK8sPodNameAndTlsShouldReturnK8sPodName()
+ {
+ TaskLocation taskLocation = TaskLocation.create("foo", 1, 2, true,
"job-name");
+ Assert.assertEquals("job-name", taskLocation.getLocation());
+ }
+
+ @Test
+ public void testGetLocationWithK8sPodNameAndNoHostShouldReturnK8sPodName()
+ {
+ TaskLocation taskLocation = TaskLocation.create(null, 1, 2, true,
"job-name");
+ Assert.assertEquals("job-name", taskLocation.getLocation());
+ }
+
+ @Test
+ public void testGetLocationWithoutK8sPodNameAndHostShouldReturnNull()
+ {
+ TaskLocation taskLocation = TaskLocation.create(null, 1, 2, false);
+ Assert.assertNull(taskLocation.getLocation());
+ }
+
+ @Test
+ public void
testGetLocationWithoutK8sPodNameAndNoTlsPortShouldReturnLocation()
+ {
+ TaskLocation taskLocation = TaskLocation.create("foo", 1, -1, false);
+ Assert.assertEquals(HostAndPort.fromParts("foo", 1).toString(),
taskLocation.getLocation());
+ }
+
+ @Test
+ public void
testGetLocationWithoutK8sPodNameAndNonZeroTlsPortShouldReturnLocation()
+ {
+ TaskLocation taskLocation = TaskLocation.create("foo", 1, 2, true);
+ Assert.assertEquals(HostAndPort.fromParts("foo", 2).toString(),
taskLocation.getLocation());
+ }
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index db4a10af81..22d1637bea 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -30,7 +30,6 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.google.common.net.HostAndPort;
import com.google.common.util.concurrent.Futures;
import com.google.inject.Inject;
import org.apache.calcite.DataContext;
@@ -822,21 +821,7 @@ public class SystemSchema extends AbstractSchema
public Object[] current()
{
final TaskStatusPlus task = it.next();
- @Nullable final String host = task.getLocation().getHost();
- @Nullable final String hostAndPort;
- if (host == null) {
- hostAndPort = null;
- } else {
- final int port;
- if (task.getLocation().getTlsPort() >= 0) {
- port = task.getLocation().getTlsPort();
- } else {
- port = task.getLocation().getPort();
- }
-
- hostAndPort = HostAndPort.fromParts(host, port).toString();
- }
return new Object[]{
task.getId(),
task.getGroupId(),
@@ -847,8 +832,8 @@ public class SystemSchema extends AbstractSchema
toStringOrNull(task.getStatusCode()),
toStringOrNull(task.getRunnerStatusCode()),
task.getDuration() == null ? 0L : task.getDuration(),
- hostAndPort,
- host,
+ task.getLocation().getLocation(),
+ task.getLocation().getHost(),
(long) task.getLocation().getPort(),
(long) task.getLocation().getTlsPort(),
task.getErrorMsg()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]