This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 49a3f42  Add an integration test for null-only columns (#12365)
49a3f42 is described below

commit 49a3f4291a667ee33797ddcc83b3a274da333c2d
Author: Jihoon Son <[email protected]>
AuthorDate: Mon Mar 28 16:40:45 2022 -0700

    Add an integration test for null-only columns (#12365)
    
    * integration test for null-only-columns
    
    * metadata query
    
    * fix test
---
 .../druid/testing/clients/TaskResponseObject.java  |  10 +-
 .../druid/tests/indexer/AbstractIndexerTest.java   |   6 +
 .../indexer/AbstractKafkaIndexingServiceTest.java  |   9 +-
 .../AbstractKinesisIndexingServiceTest.java        |   7 +
 .../tests/indexer/AbstractStreamIndexingTest.java  |  29 ++-
 .../druid/tests/indexer/ITNilColumnTest.java       | 221 +++++++++++++++++++++
 .../stream/data/supervisor_spec_template.json      |   2 +-
 7 files changed, 273 insertions(+), 11 deletions(-)

diff --git 
a/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java
 
b/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java
index 38f0a86..3062a6f 100644
--- 
a/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java
+++ 
b/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java
@@ -49,31 +49,31 @@ public class TaskResponseObject
     this.status = status;
   }
 
-  @SuppressWarnings("unused") // Used by Jackson serialization?
+  @JsonProperty
   public String getId()
   {
     return id;
   }
 
-  @SuppressWarnings("unused") // Used by Jackson serialization?
+  @JsonProperty
   public String getType()
   {
     return type;
   }
 
-  @SuppressWarnings("unused") // Used by Jackson serialization?
+  @JsonProperty
   public DateTime getCreatedTime()
   {
     return createdTime;
   }
 
-  @SuppressWarnings("unused") // Used by Jackson serialization?
+  @JsonProperty
   public DateTime getQueueInsertionTime()
   {
     return queueInsertionTime;
   }
 
-  @SuppressWarnings("unused") // Used by Jackson serialization?
+  @JsonProperty
   public TaskState getStatus()
   {
     return status;
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
index f8fd41a..0c88619 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
@@ -31,7 +31,9 @@ import org.apache.druid.testing.IntegrationTestingConfig;
 import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
 import org.apache.druid.testing.clients.OverlordResourceTestClient;
 import org.apache.druid.testing.clients.TaskResponseObject;
+import org.apache.druid.testing.utils.DataLoaderHelper;
 import org.apache.druid.testing.utils.ITRetryUtil;
+import org.apache.druid.testing.utils.SqlTestQueryHelper;
 import org.apache.druid.testing.utils.TestQueryHelper;
 import org.joda.time.Interval;
 
@@ -62,6 +64,10 @@ public abstract class AbstractIndexerTest
   protected ObjectMapper smileMapper;
   @Inject
   protected TestQueryHelper queryHelper;
+  @Inject
+  protected SqlTestQueryHelper sqlQueryHelper;
+  @Inject
+  protected DataLoaderHelper dataLoaderHelper;
 
   @Inject
   protected IntegrationTestingConfig config;
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
index 5ea11e6..06f2841 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
@@ -29,6 +29,7 @@ import org.apache.druid.testing.utils.KafkaUtil;
 import org.apache.druid.testing.utils.StreamAdminClient;
 import org.apache.druid.testing.utils.StreamEventWriter;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.function.Function;
@@ -53,6 +54,7 @@ public abstract class AbstractKafkaIndexingServiceTest 
extends AbstractStreamInd
       String fullDatasourceName,
       String parserType,
       String parserOrInputFormat,
+      List<String> dimensions,
       IntegrationTestingConfig config
   )
   {
@@ -117,13 +119,16 @@ public abstract class AbstractKafkaIndexingServiceTest 
extends AbstractStreamInd
             "%%STREAM_PROPERTIES_KEY%%",
             "consumerProperties"
         );
-
         spec = StringUtils.replace(
             spec,
             "%%SCHEMA_REGISTRY_HOST%%",
             StringUtils.format("http://%s";, 
config.getSchemaRegistryInternalHost())
         );
-
+        spec = StringUtils.replace(
+            spec,
+            "%%DIMENSIONS%%",
+            jsonMapper.writeValueAsString(dimensions)
+        );
         return StringUtils.replace(
             spec,
             "%%STREAM_PROPERTIES_VALUE%%",
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java
index 5854a62..b83ea95 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java
@@ -28,6 +28,7 @@ import org.apache.druid.testing.utils.StreamAdminClient;
 import org.apache.druid.testing.utils.StreamEventWriter;
 
 import javax.annotation.Nullable;
+import java.util.List;
 import java.util.function.Function;
 
 public abstract class AbstractKinesisIndexingServiceTest extends 
AbstractStreamIndexingTest
@@ -59,6 +60,7 @@ public abstract class AbstractKinesisIndexingServiceTest 
extends AbstractStreamI
       String fullDatasourceName,
       String parserType,
       String parserOrInputFormat,
+      List<String> dimensions,
       IntegrationTestingConfig config
   )
   {
@@ -122,6 +124,11 @@ public abstract class AbstractKinesisIndexingServiceTest 
extends AbstractStreamI
                 "%%SCHEMA_REGISTRY_HOST%%",
                 StringUtils.format("http://%s";, 
config.getSchemaRegistryInternalHost())
         );
+        spec = StringUtils.replace(
+            spec,
+            "%%DIMENSIONS%%",
+            jsonMapper.writeValueAsString(dimensions)
+        );
         return StringUtils.replace(
             spec,
             "%%STREAM_PROPERTIES_VALUE%%",
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
index 5ce8728..c98d249 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.tests.indexer;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.inject.Inject;
@@ -73,7 +74,7 @@ public abstract class AbstractStreamIndexingTest extends 
AbstractIndexerTest
   // The value to this tag is a timestamp that can be used by a lambda 
function to remove unused stream.
   private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after";
   private static final int STREAM_SHARD_COUNT = 2;
-  private static final long CYCLE_PADDING_MS = 100;
+  protected static final long CYCLE_PADDING_MS = 100;
 
   private static final String QUERIES_FILE = 
"/stream/queries/stream_index_queries.json";
   private static final String SUPERVISOR_SPEC_TEMPLATE_FILE = 
"supervisor_spec_template.json";
@@ -93,9 +94,24 @@ public abstract class AbstractStreamIndexingTest extends 
AbstractIndexerTest
   protected static final String INPUT_FORMAT = "inputFormat";
   protected static final String INPUT_ROW_PARSER = "parser";
 
-  private static final String JSON_INPUT_FORMAT_PATH =
+  protected static final String JSON_INPUT_FORMAT_PATH =
       String.join("/", DATA_RESOURCE_ROOT, "json", INPUT_FORMAT_SPEC_DIR, 
"input_format.json");
 
+  protected static final List<String> DEFAULT_DIMENSIONS = ImmutableList.of(
+      "page",
+      "language",
+      "user",
+      "unpatrolled",
+      "newPage",
+      "robot",
+      "anonymous",
+      "namespace",
+      "continent",
+      "country",
+      "region",
+      "city"
+  );
+
   @Inject
   private DruidClusterAdminClient druidClusterAdminClient;
 
@@ -117,6 +133,7 @@ public abstract class AbstractStreamIndexingTest extends 
AbstractIndexerTest
       String fullDatasourceName,
       String parserType,
       String parserOrInputFormat,
+      List<String> dimensions,
       IntegrationTestingConfig config
   );
 
@@ -625,7 +642,7 @@ public abstract class AbstractStreamIndexingTest extends 
AbstractIndexerTest
     }
   }
 
-  private void verifyIngestedData(GeneratedTestConfig generatedTestConfig, 
long numWritten) throws Exception
+  protected void verifyIngestedData(GeneratedTestConfig generatedTestConfig, 
long numWritten) throws Exception
   {
     // Wait for supervisor to consume events
     LOG.info("Waiting for stream indexing tasks to consume events");
@@ -721,6 +738,11 @@ public abstract class AbstractStreamIndexingTest extends 
AbstractIndexerTest
 
     public GeneratedTestConfig(String parserType, String parserOrInputFormat) 
throws Exception
     {
+      this(parserType, parserOrInputFormat, DEFAULT_DIMENSIONS);
+    }
+
+    public GeneratedTestConfig(String parserType, String parserOrInputFormat, 
List<String> dimensions) throws Exception
+    {
       streamName = getTestNamePrefix() + "_index_test_" + UUID.randomUUID();
       String datasource = getTestNamePrefix() + "_indexing_service_test_" + 
UUID.randomUUID();
       Map<String, String> tags = ImmutableMap.of(
@@ -741,6 +763,7 @@ public abstract class AbstractStreamIndexingTest extends 
AbstractIndexerTest
           fullDatasourceName,
           parserType,
           parserOrInputFormat,
+          dimensions,
           config
       );
       streamQueryPropsTransform = 
generateStreamQueryPropsTransform(streamName, fullDatasourceName);
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNilColumnTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNilColumnTest.java
new file mode 100644
index 0000000..9a9557d
--- /dev/null
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNilColumnTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import 
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager.BasicState;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.sql.http.SqlQuery;
+import org.apache.druid.testing.clients.TaskResponseObject;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.testing.utils.ITRetryUtil;
+import org.apache.druid.testing.utils.JsonEventSerializer;
+import org.apache.druid.testing.utils.SqlQueryWithResults;
+import org.apache.druid.testing.utils.StreamEventWriter;
+import org.apache.druid.testing.utils.StreamGenerator;
+import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator;
+import org.apache.druid.tests.TestNGGroup;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Test(groups = TestNGGroup.TRANSACTIONAL_KAFKA_INDEX_SLOW)
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITNilColumnTest extends AbstractKafkaIndexingServiceTest
+{
+  private static final Logger LOG = new Logger(ITNilColumnTest.class);
+  private static final String NIL_DIM1 = "nilDim1";
+  private static final String NIL_DIM2 = "nilDim2";
+
+  private final List<String> dimensions;
+
+  public ITNilColumnTest()
+  {
+    this.dimensions = new ArrayList<>(DEFAULT_DIMENSIONS.size() + 2);
+    dimensions.add(NIL_DIM1);
+    dimensions.addAll(DEFAULT_DIMENSIONS);
+    dimensions.add(NIL_DIM2);
+  }
+
+  @Override
+  public String getTestNamePrefix()
+  {
+    return "nil-column-test";
+  }
+
+  @BeforeClass
+  public void beforeClass() throws Exception
+  {
+    doBeforeClass();
+  }
+
+  @Test
+  public void testQueryNilColumnBeforeAndAfterPublishingSegments() throws 
Exception
+  {
+    final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(
+        INPUT_FORMAT,
+        getResourceAsString(JSON_INPUT_FORMAT_PATH),
+        dimensions
+    );
+    try (
+        final Closeable closer = createResourceCloser(generatedTestConfig);
+        final StreamEventWriter streamEventWriter = 
createStreamEventWriter(config, true)
+    ) {
+      final String taskSpec = 
generatedTestConfig.getStreamIngestionPropsTransform()
+                                                 
.apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
+      LOG.info("Submitted supervisor");
+      // Start generating half of the data
+      final StreamGenerator streamGenerator = new 
WikipediaStreamEventStreamGenerator(
+          new JsonEventSerializer(jsonMapper),
+          EVENTS_PER_SECOND,
+          CYCLE_PADDING_MS
+      );
+      long numWritten = streamGenerator.run(
+          generatedTestConfig.getStreamName(),
+          streamEventWriter,
+          TOTAL_NUMBER_OF_SECOND,
+          FIRST_EVENT_TIME
+      );
+      // Verify supervisor is healthy before suspension
+      ITRetryUtil.retryUntil(
+          () -> 
BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
+          true,
+          10000,
+          30,
+          "Waiting for supervisor to be healthy"
+      );
+      // 60 events should have been ingested as per EVENTS_PER_SECOND and 
TOTAL_NUMBER_OF_SECOND.
+      // Since maxRowsInMemory is set to 500,000, every row should be in 
incrementalIndex.
+      // So, let's test if SQL finds nil dimensions from incrementalIndexes.
+      
dataLoaderHelper.waitUntilDatasourceIsReady(generatedTestConfig.getFullDatasourceName());
+      final List<SqlQueryWithResults> queryWithResults = 
getQueryWithResults(generatedTestConfig);
+      
sqlQueryHelper.testQueriesFromString(jsonMapper.writeValueAsString(queryWithResults));
+      final List<SqlQueryWithResults> metadataQueryWithResults = 
getMetadataQueryWithResults(generatedTestConfig);
+      
sqlQueryHelper.testQueriesFromString(jsonMapper.writeValueAsString(metadataQueryWithResults));
+
+      // Suspend the supervisor
+      indexer.terminateSupervisor(generatedTestConfig.getSupervisorId());
+      ITRetryUtil.retryUntilTrue(
+          () -> {
+            List<TaskResponseObject> tasks = indexer
+                .getRunningTasks()
+                .stream()
+                .filter(task -> 
task.getId().contains(generatedTestConfig.getFullDatasourceName()))
+                .filter(task -> "index_kafka".equals(task.getType()))
+                .collect(Collectors.toList());
+            LOG.info("[%s] tasks are running", tasks.stream().map(task -> {
+              try {
+                return jsonMapper.writeValueAsString(task);
+              }
+              catch (JsonProcessingException e) {
+                throw new RuntimeException(e);
+              }
+            }).collect(Collectors.toList()));
+            return tasks.isEmpty();
+          },
+          "Waiting for all tasks to stop"
+      );
+
+      // Now, we should have published all segments.
+      // Let's test if SQL finds nil dimensions from queryableIndexes.
+      
dataLoaderHelper.waitUntilDatasourceIsReady(generatedTestConfig.getFullDatasourceName());
+      verifyIngestedData(generatedTestConfig, numWritten);
+
+      
sqlQueryHelper.testQueriesFromString(jsonMapper.writeValueAsString(queryWithResults));
+      
sqlQueryHelper.testQueriesFromString(jsonMapper.writeValueAsString(metadataQueryWithResults));
+    }
+  }
+
+  private static List<SqlQueryWithResults> 
getQueryWithResults(GeneratedTestConfig generatedTestConfig)
+  {
+    return ImmutableList.of(
+        new SqlQueryWithResults(
+            new SqlQuery(
+                StringUtils.format(
+                    "SELECT count(*) FROM \"%s\" WHERE %s IS NOT NULL OR %s IS 
NOT NULL",
+                    generatedTestConfig.getFullDatasourceName(),
+                    NIL_DIM1,
+                    NIL_DIM2
+                ),
+                null,
+                false,
+                false,
+                false,
+                null,
+                null
+            ),
+            ImmutableList.of(ImmutableMap.of("EXPR$0", 0))
+        )
+    );
+  }
+
+  private List<SqlQueryWithResults> 
getMetadataQueryWithResults(GeneratedTestConfig generatedTestConfig)
+  {
+    return ImmutableList.of(
+        new SqlQueryWithResults(
+            new SqlQuery(
+                StringUtils.format(
+                    "SELECT COLUMN_NAME, IS_NULLABLE, DATA_TYPE"
+                    + " FROM INFORMATION_SCHEMA.COLUMNS"
+                    + " WHERE TABLE_NAME = '%s' AND COLUMN_NAME IN ('%s', 
'%s')",
+                    generatedTestConfig.getFullDatasourceName(),
+                    NIL_DIM1,
+                    NIL_DIM2
+                ),
+                null,
+                false,
+                false,
+                false,
+                null,
+                null
+            ),
+            ImmutableList.of(
+                ImmutableMap.of(
+                    "COLUMN_NAME",
+                    NIL_DIM1,
+                    "IS_NULLABLE",
+                    "YES",
+                    "DATA_TYPE",
+                    "VARCHAR"
+                ),
+                ImmutableMap.of(
+                    "COLUMN_NAME",
+                    NIL_DIM2,
+                    "IS_NULLABLE",
+                    "YES",
+                    "DATA_TYPE",
+                    "VARCHAR"
+                )
+            )
+        )
+    );
+  }
+}
diff --git 
a/integration-tests/src/test/resources/stream/data/supervisor_spec_template.json
 
b/integration-tests/src/test/resources/stream/data/supervisor_spec_template.json
index d921165..bc66812 100644
--- 
a/integration-tests/src/test/resources/stream/data/supervisor_spec_template.json
+++ 
b/integration-tests/src/test/resources/stream/data/supervisor_spec_template.json
@@ -8,7 +8,7 @@
       "format": "auto"
     },
     "dimensionsSpec": {
-      "dimensions": ["page", "language", "user", "unpatrolled", "newPage", 
"robot", "anonymous", "namespace", "continent", "country", "region", "city"],
+      "dimensions": %%DIMENSIONS%%,
       "dimensionExclusions": [],
       "spatialDimensions": []
     },

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to