This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 49a3f42 Add an integration test for null-only columns (#12365)
49a3f42 is described below
commit 49a3f4291a667ee33797ddcc83b3a274da333c2d
Author: Jihoon Son <[email protected]>
AuthorDate: Mon Mar 28 16:40:45 2022 -0700
Add an integration test for null-only columns (#12365)
* integration test for null-only-columns
* metadata query
* fix test
---
.../druid/testing/clients/TaskResponseObject.java | 10 +-
.../druid/tests/indexer/AbstractIndexerTest.java | 6 +
.../indexer/AbstractKafkaIndexingServiceTest.java | 9 +-
.../AbstractKinesisIndexingServiceTest.java | 7 +
.../tests/indexer/AbstractStreamIndexingTest.java | 29 ++-
.../druid/tests/indexer/ITNilColumnTest.java | 221 +++++++++++++++++++++
.../stream/data/supervisor_spec_template.json | 2 +-
7 files changed, 273 insertions(+), 11 deletions(-)
diff --git
a/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java
b/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java
index 38f0a86..3062a6f 100644
---
a/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java
+++
b/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java
@@ -49,31 +49,31 @@ public class TaskResponseObject
this.status = status;
}
- @SuppressWarnings("unused") // Used by Jackson serialization?
+ @JsonProperty
public String getId()
{
return id;
}
- @SuppressWarnings("unused") // Used by Jackson serialization?
+ @JsonProperty
public String getType()
{
return type;
}
- @SuppressWarnings("unused") // Used by Jackson serialization?
+ @JsonProperty
public DateTime getCreatedTime()
{
return createdTime;
}
- @SuppressWarnings("unused") // Used by Jackson serialization?
+ @JsonProperty
public DateTime getQueueInsertionTime()
{
return queueInsertionTime;
}
- @SuppressWarnings("unused") // Used by Jackson serialization?
+ @JsonProperty
public TaskState getStatus()
{
return status;
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
index f8fd41a..0c88619 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
@@ -31,7 +31,9 @@ import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.clients.OverlordResourceTestClient;
import org.apache.druid.testing.clients.TaskResponseObject;
+import org.apache.druid.testing.utils.DataLoaderHelper;
import org.apache.druid.testing.utils.ITRetryUtil;
+import org.apache.druid.testing.utils.SqlTestQueryHelper;
import org.apache.druid.testing.utils.TestQueryHelper;
import org.joda.time.Interval;
@@ -62,6 +64,10 @@ public abstract class AbstractIndexerTest
protected ObjectMapper smileMapper;
@Inject
protected TestQueryHelper queryHelper;
+ @Inject
+ protected SqlTestQueryHelper sqlQueryHelper;
+ @Inject
+ protected DataLoaderHelper dataLoaderHelper;
@Inject
protected IntegrationTestingConfig config;
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
index 5ea11e6..06f2841 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
@@ -29,6 +29,7 @@ import org.apache.druid.testing.utils.KafkaUtil;
import org.apache.druid.testing.utils.StreamAdminClient;
import org.apache.druid.testing.utils.StreamEventWriter;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Function;
@@ -53,6 +54,7 @@ public abstract class AbstractKafkaIndexingServiceTest
extends AbstractStreamInd
String fullDatasourceName,
String parserType,
String parserOrInputFormat,
+ List<String> dimensions,
IntegrationTestingConfig config
)
{
@@ -117,13 +119,16 @@ public abstract class AbstractKafkaIndexingServiceTest
extends AbstractStreamInd
"%%STREAM_PROPERTIES_KEY%%",
"consumerProperties"
);
-
spec = StringUtils.replace(
spec,
"%%SCHEMA_REGISTRY_HOST%%",
StringUtils.format("http://%s",
config.getSchemaRegistryInternalHost())
);
-
+ spec = StringUtils.replace(
+ spec,
+ "%%DIMENSIONS%%",
+ jsonMapper.writeValueAsString(dimensions)
+ );
return StringUtils.replace(
spec,
"%%STREAM_PROPERTIES_VALUE%%",
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java
index 5854a62..b83ea95 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java
@@ -28,6 +28,7 @@ import org.apache.druid.testing.utils.StreamAdminClient;
import org.apache.druid.testing.utils.StreamEventWriter;
import javax.annotation.Nullable;
+import java.util.List;
import java.util.function.Function;
public abstract class AbstractKinesisIndexingServiceTest extends
AbstractStreamIndexingTest
@@ -59,6 +60,7 @@ public abstract class AbstractKinesisIndexingServiceTest
extends AbstractStreamI
String fullDatasourceName,
String parserType,
String parserOrInputFormat,
+ List<String> dimensions,
IntegrationTestingConfig config
)
{
@@ -122,6 +124,11 @@ public abstract class AbstractKinesisIndexingServiceTest
extends AbstractStreamI
"%%SCHEMA_REGISTRY_HOST%%",
StringUtils.format("http://%s",
config.getSchemaRegistryInternalHost())
);
+ spec = StringUtils.replace(
+ spec,
+ "%%DIMENSIONS%%",
+ jsonMapper.writeValueAsString(dimensions)
+ );
return StringUtils.replace(
spec,
"%%STREAM_PROPERTIES_VALUE%%",
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
index 5ce8728..c98d249 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.tests.indexer;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
@@ -73,7 +74,7 @@ public abstract class AbstractStreamIndexingTest extends
AbstractIndexerTest
// The value to this tag is a timestamp that can be used by a lambda
function to remove unused stream.
private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after";
private static final int STREAM_SHARD_COUNT = 2;
- private static final long CYCLE_PADDING_MS = 100;
+ protected static final long CYCLE_PADDING_MS = 100;
private static final String QUERIES_FILE =
"/stream/queries/stream_index_queries.json";
private static final String SUPERVISOR_SPEC_TEMPLATE_FILE =
"supervisor_spec_template.json";
@@ -93,9 +94,24 @@ public abstract class AbstractStreamIndexingTest extends
AbstractIndexerTest
protected static final String INPUT_FORMAT = "inputFormat";
protected static final String INPUT_ROW_PARSER = "parser";
- private static final String JSON_INPUT_FORMAT_PATH =
+ protected static final String JSON_INPUT_FORMAT_PATH =
String.join("/", DATA_RESOURCE_ROOT, "json", INPUT_FORMAT_SPEC_DIR,
"input_format.json");
+ protected static final List<String> DEFAULT_DIMENSIONS = ImmutableList.of(
+ "page",
+ "language",
+ "user",
+ "unpatrolled",
+ "newPage",
+ "robot",
+ "anonymous",
+ "namespace",
+ "continent",
+ "country",
+ "region",
+ "city"
+ );
+
@Inject
private DruidClusterAdminClient druidClusterAdminClient;
@@ -117,6 +133,7 @@ public abstract class AbstractStreamIndexingTest extends
AbstractIndexerTest
String fullDatasourceName,
String parserType,
String parserOrInputFormat,
+ List<String> dimensions,
IntegrationTestingConfig config
);
@@ -625,7 +642,7 @@ public abstract class AbstractStreamIndexingTest extends
AbstractIndexerTest
}
}
- private void verifyIngestedData(GeneratedTestConfig generatedTestConfig,
long numWritten) throws Exception
+ protected void verifyIngestedData(GeneratedTestConfig generatedTestConfig,
long numWritten) throws Exception
{
// Wait for supervisor to consume events
LOG.info("Waiting for stream indexing tasks to consume events");
@@ -721,6 +738,11 @@ public abstract class AbstractStreamIndexingTest extends
AbstractIndexerTest
public GeneratedTestConfig(String parserType, String parserOrInputFormat)
throws Exception
{
+ this(parserType, parserOrInputFormat, DEFAULT_DIMENSIONS);
+ }
+
+ public GeneratedTestConfig(String parserType, String parserOrInputFormat,
List<String> dimensions) throws Exception
+ {
streamName = getTestNamePrefix() + "_index_test_" + UUID.randomUUID();
String datasource = getTestNamePrefix() + "_indexing_service_test_" +
UUID.randomUUID();
Map<String, String> tags = ImmutableMap.of(
@@ -741,6 +763,7 @@ public abstract class AbstractStreamIndexingTest extends
AbstractIndexerTest
fullDatasourceName,
parserType,
parserOrInputFormat,
+ dimensions,
config
);
streamQueryPropsTransform =
generateStreamQueryPropsTransform(streamName, fullDatasourceName);
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNilColumnTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNilColumnTest.java
new file mode 100644
index 0000000..9a9557d
--- /dev/null
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNilColumnTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager.BasicState;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.sql.http.SqlQuery;
+import org.apache.druid.testing.clients.TaskResponseObject;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.testing.utils.ITRetryUtil;
+import org.apache.druid.testing.utils.JsonEventSerializer;
+import org.apache.druid.testing.utils.SqlQueryWithResults;
+import org.apache.druid.testing.utils.StreamEventWriter;
+import org.apache.druid.testing.utils.StreamGenerator;
+import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator;
+import org.apache.druid.tests.TestNGGroup;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Test(groups = TestNGGroup.TRANSACTIONAL_KAFKA_INDEX_SLOW)
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITNilColumnTest extends AbstractKafkaIndexingServiceTest
+{
+ private static final Logger LOG = new Logger(ITNilColumnTest.class);
+ private static final String NIL_DIM1 = "nilDim1";
+ private static final String NIL_DIM2 = "nilDim2";
+
+ private final List<String> dimensions;
+
+ public ITNilColumnTest()
+ {
+ this.dimensions = new ArrayList<>(DEFAULT_DIMENSIONS.size() + 2);
+ dimensions.add(NIL_DIM1);
+ dimensions.addAll(DEFAULT_DIMENSIONS);
+ dimensions.add(NIL_DIM2);
+ }
+
+ @Override
+ public String getTestNamePrefix()
+ {
+ return "nil-column-test";
+ }
+
+ @BeforeClass
+ public void beforeClass() throws Exception
+ {
+ doBeforeClass();
+ }
+
+ @Test
+ public void testQueryNilColumnBeforeAndAfterPublishingSegments() throws
Exception
+ {
+ final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(
+ INPUT_FORMAT,
+ getResourceAsString(JSON_INPUT_FORMAT_PATH),
+ dimensions
+ );
+ try (
+ final Closeable closer = createResourceCloser(generatedTestConfig);
+ final StreamEventWriter streamEventWriter =
createStreamEventWriter(config, true)
+ ) {
+ final String taskSpec =
generatedTestConfig.getStreamIngestionPropsTransform()
+
.apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
+ LOG.info("supervisorSpec: [%s]\n", taskSpec);
+ // Start supervisor
+ generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
+ LOG.info("Submitted supervisor");
+ // Start generating half of the data
+ final StreamGenerator streamGenerator = new
WikipediaStreamEventStreamGenerator(
+ new JsonEventSerializer(jsonMapper),
+ EVENTS_PER_SECOND,
+ CYCLE_PADDING_MS
+ );
+ long numWritten = streamGenerator.run(
+ generatedTestConfig.getStreamName(),
+ streamEventWriter,
+ TOTAL_NUMBER_OF_SECOND,
+ FIRST_EVENT_TIME
+ );
+ // Verify supervisor is healthy before suspension
+ ITRetryUtil.retryUntil(
+ () ->
BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
+ true,
+ 10000,
+ 30,
+ "Waiting for supervisor to be healthy"
+ );
+ // 60 events should have been ingested as per EVENTS_PER_SECOND and
TOTAL_NUMBER_OF_SECOND.
+ // Since maxRowsInMemory is set to 500,000, every row should be in
incrementalIndex.
+ // So, let's test if SQL finds nil dimensions from incrementalIndexes.
+
dataLoaderHelper.waitUntilDatasourceIsReady(generatedTestConfig.getFullDatasourceName());
+ final List<SqlQueryWithResults> queryWithResults =
getQueryWithResults(generatedTestConfig);
+
sqlQueryHelper.testQueriesFromString(jsonMapper.writeValueAsString(queryWithResults));
+ final List<SqlQueryWithResults> metadataQueryWithResults =
getMetadataQueryWithResults(generatedTestConfig);
+
sqlQueryHelper.testQueriesFromString(jsonMapper.writeValueAsString(metadataQueryWithResults));
+
+ // Suspend the supervisor
+ indexer.terminateSupervisor(generatedTestConfig.getSupervisorId());
+ ITRetryUtil.retryUntilTrue(
+ () -> {
+ List<TaskResponseObject> tasks = indexer
+ .getRunningTasks()
+ .stream()
+ .filter(task ->
task.getId().contains(generatedTestConfig.getFullDatasourceName()))
+ .filter(task -> "index_kafka".equals(task.getType()))
+ .collect(Collectors.toList());
+ LOG.info("[%s] tasks are running", tasks.stream().map(task -> {
+ try {
+ return jsonMapper.writeValueAsString(task);
+ }
+ catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }).collect(Collectors.toList()));
+ return tasks.isEmpty();
+ },
+ "Waiting for all tasks to stop"
+ );
+
+ // Now, we should have published all segments.
+ // Let's test if SQL finds nil dimensions from queryableIndexes.
+
dataLoaderHelper.waitUntilDatasourceIsReady(generatedTestConfig.getFullDatasourceName());
+ verifyIngestedData(generatedTestConfig, numWritten);
+
+
sqlQueryHelper.testQueriesFromString(jsonMapper.writeValueAsString(queryWithResults));
+
sqlQueryHelper.testQueriesFromString(jsonMapper.writeValueAsString(metadataQueryWithResults));
+ }
+ }
+
+ private static List<SqlQueryWithResults>
getQueryWithResults(GeneratedTestConfig generatedTestConfig)
+ {
+ return ImmutableList.of(
+ new SqlQueryWithResults(
+ new SqlQuery(
+ StringUtils.format(
+ "SELECT count(*) FROM \"%s\" WHERE %s IS NOT NULL OR %s IS
NOT NULL",
+ generatedTestConfig.getFullDatasourceName(),
+ NIL_DIM1,
+ NIL_DIM2
+ ),
+ null,
+ false,
+ false,
+ false,
+ null,
+ null
+ ),
+ ImmutableList.of(ImmutableMap.of("EXPR$0", 0))
+ )
+ );
+ }
+
+ private List<SqlQueryWithResults>
getMetadataQueryWithResults(GeneratedTestConfig generatedTestConfig)
+ {
+ return ImmutableList.of(
+ new SqlQueryWithResults(
+ new SqlQuery(
+ StringUtils.format(
+ "SELECT COLUMN_NAME, IS_NULLABLE, DATA_TYPE"
+ + " FROM INFORMATION_SCHEMA.COLUMNS"
+ + " WHERE TABLE_NAME = '%s' AND COLUMN_NAME IN ('%s',
'%s')",
+ generatedTestConfig.getFullDatasourceName(),
+ NIL_DIM1,
+ NIL_DIM2
+ ),
+ null,
+ false,
+ false,
+ false,
+ null,
+ null
+ ),
+ ImmutableList.of(
+ ImmutableMap.of(
+ "COLUMN_NAME",
+ NIL_DIM1,
+ "IS_NULLABLE",
+ "YES",
+ "DATA_TYPE",
+ "VARCHAR"
+ ),
+ ImmutableMap.of(
+ "COLUMN_NAME",
+ NIL_DIM2,
+ "IS_NULLABLE",
+ "YES",
+ "DATA_TYPE",
+ "VARCHAR"
+ )
+ )
+ )
+ );
+ }
+}
diff --git
a/integration-tests/src/test/resources/stream/data/supervisor_spec_template.json
b/integration-tests/src/test/resources/stream/data/supervisor_spec_template.json
index d921165..bc66812 100644
---
a/integration-tests/src/test/resources/stream/data/supervisor_spec_template.json
+++
b/integration-tests/src/test/resources/stream/data/supervisor_spec_template.json
@@ -8,7 +8,7 @@
"format": "auto"
},
"dimensionsSpec": {
- "dimensions": ["page", "language", "user", "unpatrolled", "newPage",
"robot", "anonymous", "namespace", "continent", "country", "region", "city"],
+ "dimensions": %%DIMENSIONS%%,
"dimensionExclusions": [],
"spatialDimensions": []
},
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]