This is an automated email from the ASF dual-hosted git repository.

suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 757603a773 Set task location as k8sPodName for mm-less ingestion 
(#14959)
757603a773 is described below

commit 757603a773d4459a8a2be2ba74d972d70eadb3f6
Author: Suneet Saldanha <[email protected]>
AuthorDate: Mon Sep 11 19:44:26 2023 -0700

    Set task location as k8sPodName for mm-less ingestion (#14959)
    
    * Set task location as k8sPodName for mm-less ingestion
    
    * tests
---
 .../org/apache/druid/indexer/TaskLocation.java     | 21 +++++++++++
 .../org/apache/druid/indexer/TaskLocationTest.java | 43 ++++++++++++++++++++++
 .../druid/sql/calcite/schema/SystemSchema.java     | 19 +---------
 3 files changed, 66 insertions(+), 17 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/indexer/TaskLocation.java 
b/processing/src/main/java/org/apache/druid/indexer/TaskLocation.java
index 21e2006211..2992814de4 100644
--- a/processing/src/main/java/org/apache/druid/indexer/TaskLocation.java
+++ b/processing/src/main/java/org/apache/druid/indexer/TaskLocation.java
@@ -20,8 +20,10 @@
 package org.apache.druid.indexer;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.net.HostAndPort;
 import org.apache.druid.java.util.common.IAE;
 
 import javax.annotation.Nullable;
@@ -102,6 +104,25 @@ public class TaskLocation
     return k8sPodName;
   }
 
+  @JsonIgnore
+  @Nullable
+  public String getLocation()
+  {
+    if (k8sPodName != null) {
+      return k8sPodName;
+    } else if (host == null) {
+      return null;
+    } else {
+      final int thePort;
+      if (tlsPort >= 0) {
+        thePort = tlsPort;
+      } else {
+        thePort = port;
+      }
+      return HostAndPort.fromParts(host, thePort).toString();
+    }
+  }
+
   public URL makeURL(final String encodedPathAndQueryString) throws 
MalformedURLException
   {
     final String scheme;
diff --git 
a/processing/src/test/java/org/apache/druid/indexer/TaskLocationTest.java 
b/processing/src/test/java/org/apache/druid/indexer/TaskLocationTest.java
index 03a751c5dd..f7bad61d67 100644
--- a/processing/src/test/java/org/apache/druid/indexer/TaskLocationTest.java
+++ b/processing/src/test/java/org/apache/druid/indexer/TaskLocationTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.indexer;
 
 
+import com.google.common.net.HostAndPort;
 import nl.jqno.equalsverifier.EqualsVerifier;
 import org.junit.Assert;
 import org.junit.Test;
@@ -75,4 +76,46 @@ public class TaskLocationTest
   {
     EqualsVerifier.forClass(TaskLocation.class).usingGetClass().verify();
   }
+
+  @Test
+  public void testGetLocationWithK8sPodNameShouldReturnK8sPodName()
+  {
+    TaskLocation taskLocation = TaskLocation.create("foo", 1, 2, false, 
"job-name");
+    Assert.assertEquals("job-name", taskLocation.getLocation());
+  }
+
+  @Test
+  public void testGetLocationWithK8sPodNameAndTlsShouldReturnK8sPodName()
+  {
+    TaskLocation taskLocation = TaskLocation.create("foo", 1, 2, true, 
"job-name");
+    Assert.assertEquals("job-name", taskLocation.getLocation());
+  }
+
+  @Test
+  public void testGetLocationWithK8sPodNameAndNoHostShouldReturnK8sPodName()
+  {
+    TaskLocation taskLocation = TaskLocation.create(null, 1, 2, true, 
"job-name");
+    Assert.assertEquals("job-name", taskLocation.getLocation());
+  }
+
+  @Test
+  public void testGetLocationWithoutK8sPodNameAndHostShouldReturnNull()
+  {
+    TaskLocation taskLocation = TaskLocation.create(null, 1, 2, false);
+    Assert.assertNull(taskLocation.getLocation());
+  }
+
+  @Test
+  public void 
testGetLocationWithoutK8sPodNameAndNoTlsPortShouldReturnLocation()
+  {
+    TaskLocation taskLocation = TaskLocation.create("foo", 1, -1, false);
+    Assert.assertEquals(HostAndPort.fromParts("foo", 1).toString(), 
taskLocation.getLocation());
+  }
+
+  @Test
+  public void 
testGetLocationWithoutK8sPodNameAndNonZeroTlsPortShouldReturnLocation()
+  {
+    TaskLocation taskLocation = TaskLocation.create("foo", 1, 2, true);
+    Assert.assertEquals(HostAndPort.fromParts("foo", 2).toString(), 
taskLocation.getLocation());
+  }
 }
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index db4a10af81..22d1637bea 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -30,7 +30,6 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.google.common.net.HostAndPort;
 import com.google.common.util.concurrent.Futures;
 import com.google.inject.Inject;
 import org.apache.calcite.DataContext;
@@ -822,21 +821,7 @@ public class SystemSchema extends AbstractSchema
             public Object[] current()
             {
               final TaskStatusPlus task = it.next();
-              @Nullable final String host = task.getLocation().getHost();
-              @Nullable final String hostAndPort;
 
-              if (host == null) {
-                hostAndPort = null;
-              } else {
-                final int port;
-                if (task.getLocation().getTlsPort() >= 0) {
-                  port = task.getLocation().getTlsPort();
-                } else {
-                  port = task.getLocation().getPort();
-                }
-
-                hostAndPort = HostAndPort.fromParts(host, port).toString();
-              }
               return new Object[]{
                   task.getId(),
                   task.getGroupId(),
@@ -847,8 +832,8 @@ public class SystemSchema extends AbstractSchema
                   toStringOrNull(task.getStatusCode()),
                   toStringOrNull(task.getRunnerStatusCode()),
                   task.getDuration() == null ? 0L : task.getDuration(),
-                  hostAndPort,
-                  host,
+                  task.getLocation().getLocation(),
+                  task.getLocation().getHost(),
                   (long) task.getLocation().getPort(),
                   (long) task.getLocation().getTlsPort(),
                   task.getErrorMsg()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to