This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch 0.18.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/0.18.0 by this push:
     new 17d3022  DruidSegmentReader should work if timestamp is specified as a 
dimension (#9530) (#9566)
17d3022 is described below

commit 17d30222da268c18df6d5525c64c8d8d7ed4a7b6
Author: Suneet Saldanha <44787917+sunee...@users.noreply.github.com>
AuthorDate: Wed Mar 25 17:03:42 2020 -0700

    DruidSegmentReader should work if timestamp is specified as a dimension 
(#9530) (#9566)
    
    * DruidSegmentReader should work if timestamp is specified as a dimension
    
    * Add integration tests
    
    Tests for compaction and re-indexing a datasource with the timestamp column
    
    * Instructions to run integration tests against quickstart
    
    * address pr
---
 .../druid/indexing/input/DruidSegmentReader.java   | 28 +++++--
 integration-tests/README.md                        | 37 +++++++++-
 integration-tests/quickstart-it.json               | 16 ++++
 .../druid/tests/indexer/ITCompactionTaskTest.java  | 34 ++++++---
 .../apache/druid/tests/indexer/ITIndexerTest.java  | 53 ++++++++++++-
 .../wikipedia_reindex_druid_input_source_task.json | 51 +++++++++++++
 .../wikipedia_with_timestamp_index_task.json       | 86 ++++++++++++++++++++++
 7 files changed, 283 insertions(+), 22 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java
index 5f45eb8..f2ce056 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java
@@ -27,7 +27,6 @@ import org.apache.druid.data.input.InputEntity.CleanableFile;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.IntermediateRowParsingReader;
 import org.apache.druid.data.input.MapBasedInputRow;
-import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.guava.Sequence;
@@ -61,6 +60,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.NoSuchElementException;
 
 public class DruidSegmentReader extends 
IntermediateRowParsingReader<Map<String, Object>>
 {
@@ -122,7 +122,7 @@ public class DruidSegmentReader extends 
IntermediateRowParsingReader<Map<String,
   @Override
   protected List<InputRow> parseInputRows(Map<String, Object> intermediateRow) 
throws ParseException
   {
-    final DateTime timestamp = (DateTime) 
intermediateRow.get(TimestampSpec.DEFAULT_COLUMN);
+    final DateTime timestamp = (DateTime) 
intermediateRow.get(ColumnHolder.TIME_COLUMN_NAME);
     return Collections.singletonList(new 
MapBasedInputRow(timestamp.getMillis(), dimensions, intermediateRow));
   }
 
@@ -209,8 +209,9 @@ public class DruidSegmentReader extends 
IntermediateRowParsingReader<Map<String,
     {
       this.cursor = cursor;
 
-      timestampColumnSelector =
-          
cursor.getColumnSelectorFactory().makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
+      timestampColumnSelector = cursor
+          .getColumnSelectorFactory()
+          .makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
 
       dimSelectors = new HashMap<>();
       for (String dim : dimensionNames) {
@@ -225,8 +226,9 @@ public class DruidSegmentReader extends 
IntermediateRowParsingReader<Map<String,
 
       metSelectors = new HashMap<>();
       for (String metric : metricNames) {
-        final BaseObjectColumnValueSelector metricSelector =
-            cursor.getColumnSelectorFactory().makeColumnValueSelector(metric);
+        final BaseObjectColumnValueSelector metricSelector = cursor
+            .getColumnSelectorFactory()
+            .makeColumnValueSelector(metric);
         metSelectors.put(metric, metricSelector);
       }
     }
@@ -240,9 +242,10 @@ public class DruidSegmentReader extends 
IntermediateRowParsingReader<Map<String,
     @Override
     public Map<String, Object> next()
     {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
       final Map<String, Object> theEvent = Maps.newLinkedHashMap();
-      final long timestamp = timestampColumnSelector.getLong();
-      theEvent.put(TimestampSpec.DEFAULT_COLUMN, DateTimes.utc(timestamp));
 
       for (Entry<String, DimensionSelector> dimSelector : 
dimSelectors.entrySet()) {
         final String dim = dimSelector.getKey();
@@ -270,6 +273,15 @@ public class DruidSegmentReader extends 
IntermediateRowParsingReader<Map<String,
           theEvent.put(metric, value);
         }
       }
+
+      // Timestamp is added last because we expect that the time column will 
always be a date time object.
+      // If it is added earlier, it can be overwritten by metrics or 
dimenstions with the same name.
+      //
+      // If a user names a metric or dimension `__time` it will be 
overwritten. This case should be rare since
+      // __time is reserved for the time column in druid segments.
+      final long timestamp = timestampColumnSelector.getLong();
+      theEvent.put(ColumnHolder.TIME_COLUMN_NAME, DateTimes.utc(timestamp));
+
       cursor.advance();
       return theEvent;
     }
diff --git a/integration-tests/README.md b/integration-tests/README.md
index 0cc5b01..03c6851 100644
--- a/integration-tests/README.md
+++ b/integration-tests/README.md
@@ -66,7 +66,40 @@ Integration tests can also be run with either Java 8 or Java 
11 by adding -Djvm.
 can either be 8 or 11.
 
 Druid's configuration (using Docker) can be overrided by providing 
-Doverride.config.path=<PATH_TO_FILE>. 
-The file must contain one property per line, the key must start with druid_ 
and the format should be snake case. 
+The file must contain one property per line, the key must start with `druid_` 
and the format should be snake case. 
+
+Running Tests Using A Quickstart Cluster
+-------------------
+
+When writing integration tests, it can be helpful to test against a quickstart
+cluster so that you can set up remote debugging with in your developer
+environment. This section walks you through setting up the integration tests
+so that it can run against a quickstart cluster running on your development
+machine.
+
+Note that not all features run by default on a quickstart cluster, so it may
+not make sense to run the entire test suite against this configuration.
+
+Make sure you have at least 6GB of memory available before you run the tests.
+
+The tests rely on files in the test/resources folder to exist under the path 
/resources,
+so create a symlink to make them available
+
+```
+  ln -s ${DRUID_HOME}/integration-tests/src/test/resources /resources
+```
+
+Set the cluster config file environment variable to the quickstart config
+```
+  export CONFIG_FILE=${DRUID_HOME}/integration-tests/quickstart-it.json
+```
+Note that quickstart does not run with ssl, so to trick the integration tests
+we specify the `*_tls_url` in the config to be the same as the http url
+
+Then run the tests using a command similar to
+```
+  mvn verify -P int-tests-config-file -Dit.test=<test_name>
+```
 
 Running Tests Using A Configuration File for Any Cluster
 -------------------
@@ -90,7 +123,7 @@ To run tests on any druid cluster that is already running, 
create a configuratio
        "cloud_path": "<(optional) cloud_path for test data if running cloud 
integration test>",
     }
 
-Set the environment variable CONFIG_FILE to the name of the configuration file:
+Set the environment variable `CONFIG_FILE` to the name of the configuration 
file:
 ```
 export CONFIG_FILE=<config file name>
 ```
diff --git a/integration-tests/quickstart-it.json 
b/integration-tests/quickstart-it.json
new file mode 100644
index 0000000..c4b05e4
--- /dev/null
+++ b/integration-tests/quickstart-it.json
@@ -0,0 +1,16 @@
+{   
+   "broker_host" : "localhost",
+   "broker_port" : "8082",
+   "broker_tls_url" : "http://localhost:8082";,
+   "router_host" : "localhost",
+   "router_port" : "8888",
+   "router_tls_url" : "http://localhost:8888";,
+   "indexer_host" : "localhost",
+   "indexer_port" : "8081",
+   "historical_host" : "localhost",
+   "historical_port" : "8083",
+   "coordinator_host" : "localhost",
+   "coordinator_port" : "8081",
+   "middlemanager_host": "localhost",
+   "zookeeper_hosts": "localhost:2181"
+}
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
index 2c0dbc3..ec6ea90 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
@@ -28,13 +28,14 @@ import org.apache.druid.testing.IntegrationTestingConfig;
 import org.apache.druid.testing.guice.DruidTestModuleFactory;
 import org.apache.druid.testing.utils.ITRetryUtil;
 import org.apache.druid.tests.TestNGGroup;
-import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
+import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 
@@ -46,33 +47,47 @@ public class ITCompactionTaskTest extends 
AbstractIndexerTest
   private static final String INDEX_TASK = 
"/indexer/wikipedia_index_task.json";
   private static final String INDEX_QUERIES_RESOURCE = 
"/indexer/wikipedia_index_queries.json";
   private static final String INDEX_DATASOURCE = "wikipedia_index_test";
+
   private static final String COMPACTION_TASK = 
"/indexer/wikipedia_compaction_task.json";
 
+  private static final String INDEX_TASK_WITH_TIMESTAMP = 
"/indexer/wikipedia_with_timestamp_index_task.json";
+
   @Inject
   private IntegrationTestingConfig config;
 
   private String fullDatasourceName;
 
-  @BeforeSuite
-  public void setFullDatasourceName()
+  @BeforeMethod
+  public void setFullDatasourceName(Method method)
   {
-    fullDatasourceName = INDEX_DATASOURCE + 
config.getExtraDatasourceNameSuffix();
+    fullDatasourceName = INDEX_DATASOURCE + 
config.getExtraDatasourceNameSuffix() + "-" + method.getName();
   }
 
   @Test
   public void testCompaction() throws Exception
   {
-    loadData();
+    loadDataAndCompact(INDEX_TASK, INDEX_QUERIES_RESOURCE);
+  }
+
+  @Test
+  public void testCompactionWithTimestampDimension() throws Exception
+  {
+    loadDataAndCompact(INDEX_TASK_WITH_TIMESTAMP, INDEX_QUERIES_RESOURCE);
+  }
+
+  private void loadDataAndCompact(String indexTask, String queriesResource) 
throws Exception
+  {
+    loadData(indexTask);
     final List<String> intervalsBeforeCompaction = 
coordinator.getSegmentIntervals(fullDatasourceName);
     intervalsBeforeCompaction.sort(null);
     try (final Closeable ignored = unloader(fullDatasourceName)) {
       String queryResponseTemplate;
       try {
-        InputStream is = 
AbstractITBatchIndexTest.class.getResourceAsStream(INDEX_QUERIES_RESOURCE);
+        InputStream is = 
AbstractITBatchIndexTest.class.getResourceAsStream(queriesResource);
         queryResponseTemplate = IOUtils.toString(is, StandardCharsets.UTF_8);
       }
       catch (IOException e) {
-        throw new ISE(e, "could not read query file: %s", 
INDEX_QUERIES_RESOURCE);
+        throw new ISE(e, "could not read query file: %s", queriesResource);
       }
 
       queryResponseTemplate = StringUtils.replace(
@@ -92,10 +107,9 @@ public class ITCompactionTaskTest extends 
AbstractIndexerTest
       checkCompactionIntervals(intervalsBeforeCompaction);
     }
   }
-
-  private void loadData() throws Exception
+  private void loadData(String indexTask) throws Exception
   {
-    String taskSpec = getResourceAsString(INDEX_TASK);
+    String taskSpec = getResourceAsString(indexTask);
     taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", 
fullDatasourceName);
     final String taskID = indexer.submitTask(taskSpec);
     LOG.info("TaskID for loading index task %s", taskID);
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
index 6ff804e..93ad11a 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
@@ -34,16 +34,26 @@ public class ITIndexerTest extends AbstractITBatchIndexTest
   private static final String INDEX_QUERIES_RESOURCE = 
"/indexer/wikipedia_index_queries.json";
   private static final String INDEX_DATASOURCE = "wikipedia_index_test";
 
+  private static final String INDEX_WITH_TIMESTAMP_TASK = 
"/indexer/wikipedia_with_timestamp_index_task.json";
+  // TODO: add queries that validate timestamp is different from the __time 
column since it is a dimension
+  // TODO: https://github.com/apache/druid/issues/9565
+  private static final String INDEX_WITH_TIMESTAMP_QUERIES_RESOURCE = 
"/indexer/wikipedia_index_queries.json";
+  private static final String INDEX_WITH_TIMESTAMP_DATASOURCE = 
"wikipedia_with_timestamp_index_test";
+
   private static final String REINDEX_TASK = 
"/indexer/wikipedia_reindex_task.json";
+  private static final String REINDEX_TASK_WITH_DRUID_INPUT_SOURCE = 
"/indexer/wikipedia_reindex_druid_input_source_task.json";
   private static final String REINDEX_QUERIES_RESOURCE = 
"/indexer/wikipedia_reindex_queries.json";
   private static final String REINDEX_DATASOURCE = "wikipedia_reindex_test";
 
   @Test
   public void testIndexData() throws Exception
   {
+    final String reindexDatasource = REINDEX_DATASOURCE + "-testIndexData";
+    final String reindexDatasourceWithDruidInputSource = REINDEX_DATASOURCE + 
"-testIndexData-druidInputSource";
     try (
         final Closeable ignored1 = unloader(INDEX_DATASOURCE + 
config.getExtraDatasourceNameSuffix());
-        final Closeable ignored2 = unloader(REINDEX_DATASOURCE + 
config.getExtraDatasourceNameSuffix())
+        final Closeable ignored2 = unloader(reindexDatasource + 
config.getExtraDatasourceNameSuffix());
+        final Closeable ignored3 = 
unloader(reindexDatasourceWithDruidInputSource + 
config.getExtraDatasourceNameSuffix())
     ) {
       doIndexTest(
           INDEX_DATASOURCE,
@@ -55,10 +65,49 @@ public class ITIndexerTest extends AbstractITBatchIndexTest
       );
       doReindexTest(
           INDEX_DATASOURCE,
-          REINDEX_DATASOURCE,
+          reindexDatasource,
+          REINDEX_TASK,
+          REINDEX_QUERIES_RESOURCE
+      );
+      doReindexTest(
+          INDEX_DATASOURCE,
+          reindexDatasourceWithDruidInputSource,
+          REINDEX_TASK_WITH_DRUID_INPUT_SOURCE,
+          REINDEX_QUERIES_RESOURCE
+      );
+    }
+  }
+
+  @Test
+  public void testReIndexDataWithTimestamp() throws Exception
+  {
+    final String reindexDatasource = REINDEX_DATASOURCE + 
"-testReIndexDataWithTimestamp";
+    final String reindexDatasourceWithDruidInputSource = REINDEX_DATASOURCE + 
"-testReIndexDataWithTimestamp-druidInputSource";
+    try (
+        final Closeable ignored1 = unloader(INDEX_WITH_TIMESTAMP_DATASOURCE + 
config.getExtraDatasourceNameSuffix());
+        final Closeable ignored2 = unloader(reindexDatasource + 
config.getExtraDatasourceNameSuffix());
+        final Closeable ignored3 = 
unloader(reindexDatasourceWithDruidInputSource + 
config.getExtraDatasourceNameSuffix())
+    ) {
+      doIndexTest(
+          INDEX_WITH_TIMESTAMP_DATASOURCE,
+          INDEX_WITH_TIMESTAMP_TASK,
+          INDEX_WITH_TIMESTAMP_QUERIES_RESOURCE,
+          false,
+          true,
+          true
+      );
+      doReindexTest(
+          INDEX_WITH_TIMESTAMP_DATASOURCE,
+          reindexDatasource,
           REINDEX_TASK,
           REINDEX_QUERIES_RESOURCE
       );
+      doReindexTest(
+          INDEX_WITH_TIMESTAMP_DATASOURCE,
+          reindexDatasourceWithDruidInputSource,
+          REINDEX_TASK_WITH_DRUID_INPUT_SOURCE,
+          REINDEX_QUERIES_RESOURCE
+      );
     }
   }
 }
diff --git 
a/integration-tests/src/test/resources/indexer/wikipedia_reindex_druid_input_source_task.json
 
b/integration-tests/src/test/resources/indexer/wikipedia_reindex_druid_input_source_task.json
new file mode 100644
index 0000000..3a5934c
--- /dev/null
+++ 
b/integration-tests/src/test/resources/indexer/wikipedia_reindex_druid_input_source_task.json
@@ -0,0 +1,51 @@
+{
+    "type": "index",
+    "spec": {
+        "ioConfig": {
+            "type": "index",
+            "inputSource": {
+                "type": "druid",
+                "dataSource": "%%DATASOURCE%%",
+                "interval": "2013-08-31/2013-09-01"
+            }
+        },
+        "tuningConfig": {
+            "type": "index",
+            "partitionsSpec": {
+                "type": "dynamic"
+            }
+        },
+        "dataSchema": {
+            "dataSource": "%%REINDEX_DATASOURCE%%",
+            "granularitySpec": {
+                "type": "uniform",
+                "queryGranularity": "SECOND",
+                "segmentGranularity": "DAY"
+            },
+            "timestampSpec": {
+                "column": "__time",
+                "format": "iso"
+            },
+            "dimensionsSpec": {
+                "dimensionExclusions" : ["robot", "continent"]
+            },
+            "metricsSpec": [
+                {
+                    "type": "doubleSum",
+                    "name": "added",
+                    "fieldName": "added"
+                },
+                {
+                    "type": "doubleSum",
+                    "name": "deleted",
+                    "fieldName": "deleted"
+                },
+                {
+                    "type": "doubleSum",
+                    "name": "delta",
+                    "fieldName": "delta"
+                }
+            ]
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/integration-tests/src/test/resources/indexer/wikipedia_with_timestamp_index_task.json
 
b/integration-tests/src/test/resources/indexer/wikipedia_with_timestamp_index_task.json
new file mode 100644
index 0000000..0d66383
--- /dev/null
+++ 
b/integration-tests/src/test/resources/indexer/wikipedia_with_timestamp_index_task.json
@@ -0,0 +1,86 @@
+{
+    "type": "index",
+    "spec": {
+        "dataSchema": {
+            "dataSource": "%%DATASOURCE%%",
+            "metricsSpec": [
+                {
+                    "type": "count",
+                    "name": "count"
+                },
+                {
+                    "type": "doubleSum",
+                    "name": "added",
+                    "fieldName": "added"
+                },
+                {
+                    "type": "doubleSum",
+                    "name": "deleted",
+                    "fieldName": "deleted"
+                },
+                {
+                    "type": "doubleSum",
+                    "name": "delta",
+                    "fieldName": "delta"
+                },
+                {
+                    "name": "thetaSketch",
+                    "type": "thetaSketch",
+                    "fieldName": "user"
+                },
+                {
+                    "name": "quantilesDoublesSketch",
+                    "type": "quantilesDoublesSketch",
+                    "fieldName": "delta"
+                },
+                {
+                    "name": "HLLSketchBuild",
+                    "type": "HLLSketchBuild",
+                    "fieldName": "user"
+                }
+            ],
+            "granularitySpec": {
+                "segmentGranularity": "DAY",
+                "queryGranularity": "second",
+                "intervals" : [ "2013-08-31/2013-09-02" ]
+            },
+            "parser": {
+                "parseSpec": {
+                    "format" : "json",
+                    "timestampSpec": {
+                        "column": "timestamp"
+                    },
+                    "dimensionsSpec": {
+                        "dimensions": [
+                            "page",
+                            {"type": "string", "name": "language", 
"createBitmapIndex": false},
+                            "user",
+                            "unpatrolled",
+                            "newPage",
+                            "robot",
+                            "anonymous",
+                            "namespace",
+                            "continent",
+                            "country",
+                            "region",
+                            "city",
+                            "timestamp"
+                        ]
+                    }
+                }
+            }
+        },
+        "ioConfig": {
+            "type": "index",
+            "firehose": {
+                "type": "local",
+                "baseDir": "/resources/data/batch_index",
+                "filter": "wikipedia_index_data*"
+            }
+        },
+        "tuningConfig": {
+            "type": "index",
+            "maxRowsPerSegment": 3
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to