This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch 29.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/29.0.0 by this push:
     new f3a89ba8e73 Report zero values instead of unknown for empty ingest 
queries (#15674) (#15791)
f3a89ba8e73 is described below

commit f3a89ba8e73c163dabe16d17b0a86619086153da
Author: Laksh Singla <[email protected]>
AuthorDate: Tue Jan 30 21:33:13 2024 +0530

    Report zero values instead of unknown for empty ingest queries (#15674) 
(#15791)
    
    MSQ now allows empty ingest queries by default. For such queries that don't 
generate any output rows, the query counters in the async status result 
object/task report don't contain numTotalRows and totalSizeInBytes. These 
properties when not set/undefined can be confusing to API clients. For example, 
the web-console treats it as unknown values.
    
    This patch fixes the counters by explicitly reporting them as 0 instead of 
null for empty ingest queries.
    
    Co-authored-by: Abhishek Radhakrishnan <[email protected]>
---
 .../druid/msq/util/SqlStatementResourceHelper.java |  20 +--
 .../resources/SqlMSQStatementResourcePostTest.java |  60 ++++++-
 .../org/apache/druid/msq/test/MSQTestBase.java     |   4 +-
 .../msq/util/SqlStatementResourceHelperTest.java   | 193 +++++++++++++++++----
 .../druid/server/router/QueryHostFinder.java       |  12 +-
 5 files changed, 228 insertions(+), 61 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java
index 9481fc60541..b60f81ecca6 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java
@@ -172,31 +172,22 @@ public class SqlStatementResourceHelper
       for (CounterSnapshots counterSnapshots : workerCounters.values()) {
         QueryCounterSnapshot queryCounterSnapshot = counterSnapshots.getMap()
                                                                     
.getOrDefault("segmentGenerationProgress", null);
-        if (queryCounterSnapshot != null && queryCounterSnapshot instanceof 
SegmentGenerationProgressCounter.Snapshot) {
+        if (queryCounterSnapshot instanceof 
SegmentGenerationProgressCounter.Snapshot) {
           rows += ((SegmentGenerationProgressCounter.Snapshot) 
queryCounterSnapshot).getRowsPushed();
         }
       }
-      if (rows != 0L) {
-        return Optional.of(ImmutableList.of(new PageInformation(0, rows, 
null)));
-      } else {
-        return Optional.empty();
-      }
+      return Optional.of(ImmutableList.of(new PageInformation(0, rows, null)));
     } else if (msqDestination instanceof TaskReportMSQDestination) {
       long rows = 0L;
       long size = 0L;
       for (CounterSnapshots counterSnapshots : workerCounters.values()) {
         QueryCounterSnapshot queryCounterSnapshot = 
counterSnapshots.getMap().getOrDefault("output", null);
-        if (queryCounterSnapshot != null && queryCounterSnapshot instanceof 
ChannelCounters.Snapshot) {
+        if (queryCounterSnapshot instanceof ChannelCounters.Snapshot) {
           rows += Arrays.stream(((ChannelCounters.Snapshot) 
queryCounterSnapshot).getRows()).sum();
           size += Arrays.stream(((ChannelCounters.Snapshot) 
queryCounterSnapshot).getBytes()).sum();
         }
       }
-      if (rows != 0L) {
-        return Optional.of(ImmutableList.of(new PageInformation(0, rows, 
size)));
-      } else {
-        return Optional.empty();
-      }
-
+      return Optional.of(ImmutableList.of(new PageInformation(0, rows, size)));
     } else if (msqDestination instanceof DurableStorageMSQDestination) {
 
       return populatePagesForDurableStorageDestination(finalStage, 
workerCounters);
@@ -221,7 +212,6 @@ public class SqlStatementResourceHelper
       throw DruidException.defensive("Expected worker count to be set for 
stage[%d]", finalStage);
     }
 
-
     List<PageInformation> pages = new ArrayList<>();
     for (int partitionNumber = 0; partitionNumber < totalPartitions; 
partitionNumber++) {
       for (int workerNumber = 0; workerNumber < totalWorkerCount; 
workerNumber++) {
@@ -230,7 +220,7 @@ public class SqlStatementResourceHelper
         if (workerCounter != null && workerCounter.getMap() != null) {
           QueryCounterSnapshot channelCounters = 
workerCounter.getMap().get("output");
 
-          if (channelCounters != null && channelCounters instanceof 
ChannelCounters.Snapshot) {
+          if (channelCounters instanceof ChannelCounters.Snapshot) {
             long rows = 0L;
             long size = 0L;
 
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java
index 72e7d345a3d..b50129c73e0 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java
@@ -179,6 +179,65 @@ public class SqlMSQStatementResourcePostTest extends 
MSQTestBase
     );
   }
 
+  @Test
+  public void emptyInsert()
+  {
+    Response response = resource.doPost(new SqlQuery(
+        "insert into foo1 select  __time, dim1 , count(*) as cnt from foo 
where dim1 is not null and __time < TIMESTAMP '1971-01-01 00:00:00' group by 1, 
2 PARTITIONED by day clustered by dim1",
+        null,
+        false,
+        false,
+        false,
+        ImmutableMap.<String, Object>builder()
+                    .putAll(defaultAsyncContext())
+                    .build(),
+        null
+    ), SqlStatementResourceTest.makeOkRequest());
+    Assert.assertEquals(Response.Status.OK.getStatusCode(), 
response.getStatus());
+
+    SqlStatementResult actual = (SqlStatementResult) response.getEntity();
+
+    SqlStatementResult expected = new SqlStatementResult(
+        actual.getQueryId(),
+        SqlStatementState.SUCCESS,
+        MSQTestOverlordServiceClient.CREATED_TIME,
+        null,
+        MSQTestOverlordServiceClient.DURATION,
+        new ResultSetInformation(0L, 0L, null, "foo1", null, null),
+        null
+    );
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void emptyReplace()
+  {
+    Response response = resource.doPost(new SqlQuery(
+        "replace into foo1 overwrite all select  __time, dim1 , count(*) as 
cnt from foo where dim1 is not null and __time < TIMESTAMP '1971-01-01 
00:00:00' group by 1, 2 PARTITIONED by day clustered by dim1",
+        null,
+        false,
+        false,
+        false,
+        ImmutableMap.<String, Object>builder()
+                    .putAll(defaultAsyncContext())
+                    .build(),
+        null
+    ), SqlStatementResourceTest.makeOkRequest());
+    Assert.assertEquals(Response.Status.OK.getStatusCode(), 
response.getStatus());
+
+    SqlStatementResult actual = (SqlStatementResult) response.getEntity();
+
+    SqlStatementResult expected = new SqlStatementResult(
+        actual.getQueryId(),
+        SqlStatementState.SUCCESS,
+        MSQTestOverlordServiceClient.CREATED_TIME,
+        null,
+        MSQTestOverlordServiceClient.DURATION,
+        new ResultSetInformation(0L, 0L, null, "foo1", null, null),
+        null
+    );
+    Assert.assertEquals(expected, actual);
+  }
 
   @Test
   public void insertCannotBeEmptyFaultTest()
@@ -433,7 +492,6 @@ public class SqlMSQStatementResourcePostTest extends 
MSQTestBase
     rows.add(ImmutableList.of(1466985600000L, "GiftBot"));
     rows.add(ImmutableList.of(1466985600000L, "GiftBot"));
 
-
     Assert.assertEquals(rows, 
SqlStatementResourceTest.getResultRowsFromResponse(resource.doGetResults(
         sqlStatementResult.getQueryId(),
         null,
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index 8891df46f2d..0146fcf9bd8 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -1404,8 +1404,8 @@ public class MSQTestBase extends BaseCalciteQueryTest
               );
               rows.addAll(new 
FrameChannelSequence(inputChannelFactory.openChannel(
                   finalStage.getId(),
-                  pageInformation.getWorker(),
-                  pageInformation.getPartition()
+                  pageInformation.getWorker() == null ? 0 : 
pageInformation.getWorker(),
+                  pageInformation.getPartition() == null ? 0 : 
pageInformation.getPartition()
               )).flatMap(frame -> SqlStatementResourceHelper.getResultSequence(
                   msqControllerTask,
                   finalStage,
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java
index 0254a61a2c7..65bd004c9b5 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java
@@ -23,11 +23,13 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.druid.frame.Frame;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.java.util.common.Pair;
-import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.msq.counters.ChannelCounters;
 import org.apache.druid.msq.counters.CounterSnapshots;
 import org.apache.druid.msq.counters.CounterSnapshotsTree;
+import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
 import org.apache.druid.msq.indexing.destination.DurableStorageMSQDestination;
+import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination;
 import org.apache.druid.msq.indexing.report.MSQStagesReport;
 import org.apache.druid.msq.indexing.report.MSQStatusReport;
 import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
@@ -38,6 +40,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -46,9 +49,6 @@ import java.util.TreeMap;
 
 public class SqlStatementResourceHelperTest
 {
-
-  private static final Logger log = new 
Logger(SqlStatementResourceHelperTest.class);
-
   @Test
   public void testDistinctPartitionsOnEachWorker()
   {
@@ -83,7 +83,7 @@ public class SqlStatementResourceHelperTest
         payload,
         DurableStorageMSQDestination.instance()
     );
-    validatePages(pages.get(), createValidationMap(worker0, worker1, worker2));
+    validatePages(pages.get(), getExpectedPageInformationList(worker0, 
worker1, worker2));
   }
 
   @Test
@@ -122,7 +122,7 @@ public class SqlStatementResourceHelperTest
         payload,
         DurableStorageMSQDestination.instance()
     );
-    validatePages(pages.get(), createValidationMap(worker0, worker1, worker2));
+    validatePages(pages.get(), getExpectedPageInformationList(worker0, 
worker1, worker2));
   }
 
 
@@ -160,7 +160,7 @@ public class SqlStatementResourceHelperTest
 
     Optional<List<PageInformation>> pages =
         SqlStatementResourceHelper.populatePageList(payload, 
DurableStorageMSQDestination.instance());
-    validatePages(pages.get(), createValidationMap(worker0, worker1, worker2, 
worker3));
+    validatePages(pages.get(), getExpectedPageInformationList(worker0, 
worker1, worker2, worker3));
   }
 
 
@@ -200,10 +200,9 @@ public class SqlStatementResourceHelperTest
         payload,
         DurableStorageMSQDestination.instance()
     );
-    validatePages(pages.get(), createValidationMap(worker0, worker1, worker2, 
worker3));
+    validatePages(pages.get(), getExpectedPageInformationList(worker0, 
worker1, worker2, worker3));
   }
 
-
   @Test
   public void testConsecutivePartitionsOnEachWorker()
   {
@@ -240,41 +239,148 @@ public class SqlStatementResourceHelperTest
         payload,
         DurableStorageMSQDestination.instance()
     );
-    validatePages(pages.get(), createValidationMap(worker0, worker1, worker2, 
worker3));
+    validatePages(pages.get(), getExpectedPageInformationList(worker0, 
worker1, worker2, worker3));
   }
 
+  /**
+   * Durable storage destination applies only to SELECT queries and unlike 
ingest queries, emtpy worker counters will not
+   * be reported in this case. See {@link 
#testEmptyCountersForTaskReportDestination()} and {@link 
#testEmptyCountersForDataSourceDestination()}
+   * to see the difference.
+   */
+  @Test
+  public void testEmptyCountersForDurableStorageDestination()
+  {
+    CounterSnapshotsTree counterSnapshots = new CounterSnapshotsTree();
+    ChannelCounters worker0 = createChannelCounters(new int[0]);
+
+    counterSnapshots.put(0, 0, new CounterSnapshots(ImmutableMap.of()));
+
+    MSQTaskReportPayload payload = new MSQTaskReportPayload(
+        new MSQStatusReport(
+          TaskState.SUCCESS,
+          null,
+          new ArrayDeque<>(),
+          null,
+          0,
+          new HashMap<>(),
+          1,
+          2,
+          null
+        ),
+        MSQStagesReport.create(
+            MSQTaskReportTest.QUERY_DEFINITION,
+            ImmutableMap.of(),
+            ImmutableMap.of(),
+            ImmutableMap.of(0, 1),
+            ImmutableMap.of(0, 1)
+        ),
+        counterSnapshots,
+        null
+    );
+
+    Optional<List<PageInformation>> pages = 
SqlStatementResourceHelper.populatePageList(
+        payload,
+        DurableStorageMSQDestination.instance()
+    );
+    validatePages(pages.get(), getExpectedPageInformationList(worker0));
+  }
 
-  private void validatePages(
-      List<PageInformation> pageList,
-      Map<Integer, Map<Integer, Pair<Long, Long>>> partitionToWorkerToRowsBytes
-  )
+  @Test
+  public void testEmptyCountersForTaskReportDestination()
   {
-    int currentPage = 0;
-    for (Map.Entry<Integer, Map<Integer, Pair<Long, Long>>> partitionWorker : 
partitionToWorkerToRowsBytes.entrySet()) {
-      for (Map.Entry<Integer, Pair<Long, Long>> workerRowsBytes : 
partitionWorker.getValue().entrySet()) {
-        PageInformation pageInformation = pageList.get(currentPage);
-        Assert.assertEquals(currentPage, pageInformation.getId());
-        Assert.assertEquals(workerRowsBytes.getValue().lhs, 
pageInformation.getNumRows());
-        Assert.assertEquals(workerRowsBytes.getValue().rhs, 
pageInformation.getSizeInBytes());
-        Assert.assertEquals(partitionWorker.getKey(), 
pageInformation.getPartition());
-        Assert.assertEquals(workerRowsBytes.getKey(), 
pageInformation.getWorker());
-        log.debug(pageInformation.toString());
-        currentPage++;
-      }
-    }
-    Assert.assertEquals(currentPage, pageList.size());
+    CounterSnapshotsTree counterSnapshots = new CounterSnapshotsTree();
+    counterSnapshots.put(0, 0, new CounterSnapshots(ImmutableMap.of()));
+
+    MSQTaskReportPayload payload = new MSQTaskReportPayload(
+        new MSQStatusReport(
+            TaskState.SUCCESS,
+            null,
+            new ArrayDeque<>(),
+            null,
+            0,
+            new HashMap<>(),
+            1,
+            2,
+            null
+        ),
+        MSQStagesReport.create(
+            MSQTaskReportTest.QUERY_DEFINITION,
+            ImmutableMap.of(),
+            ImmutableMap.of(),
+            ImmutableMap.of(0, 1),
+            ImmutableMap.of(0, 1)
+        ),
+        counterSnapshots,
+        null
+    );
+
+    Optional<List<PageInformation>> pages = 
SqlStatementResourceHelper.populatePageList(
+        payload,
+        TaskReportMSQDestination.instance()
+    );
+    Assert.assertTrue(pages.isPresent());
+    Assert.assertEquals(1, pages.get().size());
+    Assert.assertEquals(new PageInformation(0, 0L, 0L), pages.get().get(0));
+  }
+
+  @Test
+  public void testEmptyCountersForDataSourceDestination()
+  {
+    CounterSnapshotsTree counterSnapshots = new CounterSnapshotsTree();
+    counterSnapshots.put(0, 0, new CounterSnapshots(ImmutableMap.of()));
+
+    MSQTaskReportPayload payload = new MSQTaskReportPayload(
+        new MSQStatusReport(
+            TaskState.SUCCESS,
+            null,
+            new ArrayDeque<>(),
+            null,
+            0,
+            new HashMap<>(),
+            1,
+            2,
+            null
+        ),
+        MSQStagesReport.create(
+            MSQTaskReportTest.QUERY_DEFINITION,
+            ImmutableMap.of(),
+            ImmutableMap.of(),
+            ImmutableMap.of(0, 1),
+            ImmutableMap.of(0, 1)
+        ),
+        counterSnapshots,
+        null
+    );
+
+    Optional<List<PageInformation>> pages = 
SqlStatementResourceHelper.populatePageList(
+        payload,
+        new DataSourceMSQDestination(
+            "test",
+            Granularities.DAY,
+            null,
+            null
+        )
+    );
+    Assert.assertTrue(pages.isPresent());
+    Assert.assertEquals(1, pages.get().size());
+    Assert.assertEquals(new PageInformation(0, 0L, null), pages.get().get(0));
   }
 
-  private Map<Integer, Map<Integer, Pair<Long, Long>>> createValidationMap(
-      ChannelCounters... workers
-  )
+  private void validatePages(List<PageInformation> actualPageList, 
List<PageInformation> expectedPageList)
   {
-    if (workers == null || workers.length == 0) {
-      return new HashMap<>();
+    Assert.assertEquals(expectedPageList.size(), actualPageList.size());
+    Assert.assertEquals(expectedPageList, actualPageList);
+  }
+
+  private List<PageInformation> 
getExpectedPageInformationList(ChannelCounters... workerCounters)
+  {
+    List<PageInformation> pageInformationList = new ArrayList<>();
+    if (workerCounters == null || workerCounters.length == 0) {
+      return pageInformationList;
     } else {
       Map<Integer, Map<Integer, Pair<Long, Long>>> 
partitionToWorkerToRowsBytes = new TreeMap<>();
-      for (int worker = 0; worker < workers.length; worker++) {
-        ChannelCounters.Snapshot workerCounter = workers[worker].snapshot();
+      for (int worker = 0; worker < workerCounters.length; worker++) {
+        ChannelCounters.Snapshot workerCounter = 
workerCounters[worker].snapshot();
         for (int partition = 0; workerCounter != null && partition < 
workerCounter.getRows().length; partition++) {
           Map<Integer, Pair<Long, Long>> workerMap = 
partitionToWorkerToRowsBytes.computeIfAbsent(
               partition,
@@ -290,14 +396,27 @@ public class SqlStatementResourceHelperTest
                 )
             );
           }
+        }
+      }
 
+      // Construct the pages based on the order of partitionToWorkerMap.
+      for (Map.Entry<Integer, Map<Integer, Pair<Long, Long>>> 
partitionToWorkerMap : partitionToWorkerToRowsBytes.entrySet()) {
+        for (Map.Entry<Integer, Pair<Long, Long>> workerToRowsBytesMap : 
partitionToWorkerMap.getValue().entrySet()) {
+          pageInformationList.add(
+              new PageInformation(
+                  pageInformationList.size(),
+                  workerToRowsBytesMap.getValue().lhs,
+                  workerToRowsBytesMap.getValue().rhs,
+                  workerToRowsBytesMap.getKey(),
+                  partitionToWorkerMap.getKey()
+              )
+          );
         }
       }
-      return partitionToWorkerToRowsBytes;
+      return pageInformationList;
     }
   }
 
-
   private ChannelCounters createChannelCounters(int[] partitions)
   {
     if (partitions == null || partitions.length == 0) {
diff --git 
a/services/src/main/java/org/apache/druid/server/router/QueryHostFinder.java 
b/services/src/main/java/org/apache/druid/server/router/QueryHostFinder.java
index 3c15e4b8554..59251ab4c70 100644
--- a/services/src/main/java/org/apache/druid/server/router/QueryHostFinder.java
+++ b/services/src/main/java/org/apache/druid/server/router/QueryHostFinder.java
@@ -76,12 +76,12 @@ public class QueryHostFinder
     Server chosenServer = 
avaticaConnectionBalancer.pickServer(getAllServers(), connectionId);
     assertServerFound(
         chosenServer,
-        "No server found for Avatica request with connectionId [%s]",
+        "No server found for Avatica request with connectionId[%s]",
         connectionId
     );
 
     log.debug(
-        "Balancer class [%s] sending request with connectionId [%s] to server: 
%s",
+        "Balancer class[%s] sending request with connectionId[%s] to 
server[%s]",
         avaticaConnectionBalancer.getClass(),
         connectionId,
         chosenServer.getHost()
@@ -120,7 +120,7 @@ public class QueryHostFinder
     Server server = findDefaultServer();
     assertServerFound(
         server,
-        "There are no available brokers. Please check that your brokers are 
running and " + " healthy."
+        "There are no available brokers. Please check that your brokers are 
running and healthy."
     );
     return server;
   }
@@ -136,7 +136,7 @@ public class QueryHostFinder
 
     if (server == null) {
       log.error(
-          "No server found for serviceName [%s]. Using backup",
+          "No server found for serviceName[%s]. Using backup",
           serviceName
       );
 
@@ -144,7 +144,7 @@ public class QueryHostFinder
 
       if (server == null) {
         log.error(
-            "No backup found for serviceName [%s]. Using default [%s]",
+            "No backup found for serviceName[%s]. Using default[%s]",
             serviceName,
             hostSelector.getDefaultServiceName()
         );
@@ -162,7 +162,7 @@ public class QueryHostFinder
   private void assertServerFound(Server server, String messageFormat, 
Object... args)
   {
     if (server != null) {
-      log.debug("Selected [%s]", server.getHost());
+      log.debug("Selected server[%s]", server.getHost());
       return;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to