This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 5d2ed33b897 Place __time in signatures according to sort order.
(#16958)
5d2ed33b897 is described below
commit 5d2ed33b8975405916aba91fb91c8b408562de94
Author: Gian Merlino <[email protected]>
AuthorDate: Mon Aug 26 21:45:51 2024 -0700
Place __time in signatures according to sort order. (#16958)
* Place __time in signatures according to sort order.
Updates a variety of places to put __time in row signatures according
to its position in the sort order, rather than always first, including:
- InputSourceSampler.
- ScanQueryEngine (in the default signature when "columns" is empty).
- Various StorageAdapters, which also have the effect of reordering
the column order in segmentMetadata queries, and therefore in SQL
schemas as well.
Follow-up to #16849.
* Fix compilation.
* Additional fixes.
* Fix.
* Fix style.
* Omit nonexistent columns from the row signature.
* Fix tests.
---
.../org/apache/druid/msq/exec/MSQReplaceTest.java | 28 ++++++++--------
.../org/apache/druid/msq/test/MSQTestBase.java | 22 +++++++------
.../overlord/sampler/InputSourceSampler.java | 16 ++++------
.../apache/druid/query/scan/ScanQueryEngine.java | 13 +++-----
.../druid/segment/FilteredStorageAdapter.java | 7 ++++
.../segment/QueryableIndexStorageAdapter.java | 37 ++++++++++++++++++++++
.../org/apache/druid/segment/StorageAdapter.java | 7 ++--
.../apache/druid/segment/UnnestStorageAdapter.java | 18 +++++++++++
.../apache/druid/segment/column/ColumnType.java | 6 ++++
.../IncrementalIndexStorageAdapter.java | 14 ++++++++
.../join/HashJoinSegmentStorageAdapter.java | 20 ++++++++++++
.../join/table/BroadcastSegmentIndexedTable.java | 8 +----
.../org/apache/druid/frame/key/KeyTestUtils.java | 10 +++---
.../druid/query/lookup/LookupSegmentTest.java | 14 ++++++++
.../apache/druid/segment/IndexMergerTestBase.java | 20 ++++++------
.../join/HashJoinSegmentStorageAdapterTest.java | 25 +++++++++++++++
.../table/BroadcastSegmentIndexedTableTest.java | 6 ++--
.../apache/druid/segment/realtime/sink/Sink.java | 23 +++++++-------
18 files changed, 211 insertions(+), 83 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
index 96da93c34d0..e470a951877 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
@@ -279,8 +279,8 @@ public class MSQReplaceTest extends MSQTestBase
public void testReplaceOnFooWithAllClusteredByDimExplicitSort(String
contextName, Map<String, Object> context)
{
RowSignature rowSignature = RowSignature.builder()
- .add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
+ .add("__time", ColumnType.LONG)
.add("m1", ColumnType.FLOAT)
.build();
@@ -323,12 +323,12 @@ public class MSQReplaceTest extends MSQTestBase
.setExpectedShardSpec(DimensionRangeShardSpec.class)
.setExpectedResultRows(
ImmutableList.of(
- new Object[]{946684800000L,
NullHandling.sqlCompatible() ? "" : null, 1.0f},
- new Object[]{978307200000L, "1", 4.0f},
- new Object[]{946771200000L, "10.1", 2.0f},
- new Object[]{946857600000L, "2", 3.0f},
- new Object[]{978480000000L, "abc", 6.0f},
- new Object[]{978393600000L, "def", 5.0f}
+ new Object[]{NullHandling.sqlCompatible() ? "" :
null, 946684800000L, 1.0f},
+ new Object[]{"1", 978307200000L, 4.0f},
+ new Object[]{"10.1", 946771200000L, 2.0f},
+ new Object[]{"2", 946857600000L, 3.0f},
+ new Object[]{"abc", 978480000000L, 6.0f},
+ new Object[]{"def", 978393600000L, 5.0f}
)
)
.setExpectedSegmentGenerationProgressCountersForStageWorker(
@@ -365,8 +365,8 @@ public class MSQReplaceTest extends MSQTestBase
// forceSegmentSortByTime = false. (Same expectations as the prior test,
// testReplaceOnFooWithAllClusteredByDimExplicitSort.)
RowSignature rowSignature = RowSignature.builder()
- .add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
+ .add("__time", ColumnType.LONG)
.add("m1", ColumnType.FLOAT)
.build();
@@ -409,12 +409,12 @@ public class MSQReplaceTest extends MSQTestBase
.setExpectedShardSpec(DimensionRangeShardSpec.class)
.setExpectedResultRows(
ImmutableList.of(
- new Object[]{946684800000L,
NullHandling.sqlCompatible() ? "" : null, 1.0f},
- new Object[]{978307200000L, "1", 4.0f},
- new Object[]{946771200000L, "10.1", 2.0f},
- new Object[]{946857600000L, "2", 3.0f},
- new Object[]{978480000000L, "abc", 6.0f},
- new Object[]{978393600000L, "def", 5.0f}
+ new Object[]{NullHandling.sqlCompatible() ? "" :
null, 946684800000L, 1.0f},
+ new Object[]{"1", 978307200000L, 4.0f},
+ new Object[]{"10.1", 946771200000L, 2.0f},
+ new Object[]{"2", 946857600000L, 3.0f},
+ new Object[]{"abc", 978480000000L, 6.0f},
+ new Object[]{"def", 978393600000L, 5.0f}
)
)
.setExpectedSegmentGenerationProgressCountersForStageWorker(
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index 5076e9f3d44..258b5c97751 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -141,6 +141,7 @@ import org.apache.druid.segment.Segment;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.loading.DataSegmentPusher;
@@ -1288,17 +1289,20 @@ public class MSQTestBase extends BaseCalciteQueryTest
Assert.assertEquals(expectedDestinationIntervals,
destination.getReplaceTimeChunks());
}
if (expectedSegments != null) {
+ final int timeIndex =
+
MSQResultsReport.ColumnAndType.toRowSignature(expectedRowSignature)
+
.indexOf(ColumnHolder.TIME_COLUMN_NAME);
Assert.assertEquals(expectedSegments,
segmentIdVsOutputRowsMap.keySet());
for (Object[] row : transformedOutputRows) {
- List<SegmentId> diskSegmentList = segmentIdVsOutputRowsMap.keySet()
- .stream()
-
.filter(segmentId -> segmentId.getInterval()
-
.contains((Long) row[0]))
-
.filter(segmentId -> {
-
List<List<Object>> lists = segmentIdVsOutputRowsMap.get(segmentId);
- return
lists.contains(Arrays.asList(row));
- })
-
.collect(Collectors.toList());
+ List<SegmentId> diskSegmentList = segmentIdVsOutputRowsMap
+ .keySet()
+ .stream()
+ .filter(segmentId -> segmentId.getInterval().contains((Long)
row[timeIndex]))
+ .filter(segmentId -> {
+ List<List<Object>> lists =
segmentIdVsOutputRowsMap.get(segmentId);
+ return lists.contains(Arrays.asList(row));
+ })
+ .collect(Collectors.toList());
if (diskSegmentList.size() != 1) {
throw new IllegalStateException("Single key in multiple
partitions");
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
index 4e5e8634f18..f98287124ed 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
@@ -241,14 +241,12 @@ public class InputSourceSampler
List<DimensionSchema> physicalDimensionSchemas = new ArrayList<>();
RowSignature.Builder signatureBuilder = RowSignature.builder();
- signatureBuilder.add(
- ColumnHolder.TIME_COLUMN_NAME,
-
index.getColumnCapabilities(ColumnHolder.TIME_COLUMN_NAME).toColumnType()
- );
- for (IncrementalIndex.DimensionDesc dimensionDesc :
index.getDimensions()) {
- if
(!SamplerInputRow.SAMPLER_ORDERING_COLUMN.equals(dimensionDesc.getName())) {
- final ColumnType columnType =
dimensionDesc.getCapabilities().toColumnType();
- signatureBuilder.add(dimensionDesc.getName(), columnType);
+ for (final String dimensionName : index.getDimensionNames(true)) {
+ if (ColumnHolder.TIME_COLUMN_NAME.equals(dimensionName)) {
+ signatureBuilder.addTimeColumn();
+ } else if
(!SamplerInputRow.SAMPLER_ORDERING_COLUMN.equals(dimensionName)) {
+ final IncrementalIndex.DimensionDesc dimensionDesc =
index.getDimension(dimensionName);
+ signatureBuilder.add(dimensionDesc.getName(),
ColumnType.fromCapabilities(dimensionDesc.getCapabilities()));
// use explicitly specified dimension schema if it exists
if (dataSchema != null &&
dataSchema.getDimensionsSpec() != null &&
@@ -271,7 +269,7 @@ public class InputSourceSampler
if
(!SamplerInputRow.SAMPLER_ORDERING_COLUMN.equals(aggregatorFactory.getName())) {
signatureBuilder.add(
aggregatorFactory.getName(),
-
index.getColumnCapabilities(aggregatorFactory.getName()).toColumnType()
+
ColumnType.fromCapabilities(index.getColumnCapabilities(aggregatorFactory.getName()))
);
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java
b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java
index ca2c2725c6f..9dd72d15fa4 100644
--- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java
+++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java
@@ -45,6 +45,7 @@ import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.timeline.SegmentId;
@@ -98,13 +99,11 @@ public class ScanQueryEngine
} else {
final Set<String> availableColumns = Sets.newLinkedHashSet(
Iterables.concat(
- Collections.singleton(ColumnHolder.TIME_COLUMN_NAME),
+ adapter.getRowSignature().getColumnNames(),
Iterables.transform(
Arrays.asList(query.getVirtualColumns().getVirtualColumns()),
VirtualColumn::getOutputName
- ),
- adapter.getAvailableDimensions(),
- adapter.getAvailableMetrics()
+ )
)
);
@@ -152,11 +151,7 @@ public class ScanQueryEngine
for (String column : allColumns) {
final BaseObjectColumnValueSelector selector =
factory.makeColumnValueSelector(column);
ColumnCapabilities columnCapabilities =
factory.getColumnCapabilities(column);
- rowSignatureBuilder.add(
- column,
- columnCapabilities == null ? null :
columnCapabilities.toColumnType()
- );
-
+ rowSignatureBuilder.add(column,
ColumnType.fromCapabilities(columnCapabilities));
columnSelectors.add(selector);
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/FilteredStorageAdapter.java
b/processing/src/main/java/org/apache/druid/segment/FilteredStorageAdapter.java
index 3fcfe2dd78a..46b9f5fe061 100644
---
a/processing/src/main/java/org/apache/druid/segment/FilteredStorageAdapter.java
+++
b/processing/src/main/java/org/apache/druid/segment/FilteredStorageAdapter.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.filter.AndFilter;
import org.joda.time.Interval;
@@ -69,6 +70,12 @@ public class FilteredStorageAdapter implements StorageAdapter
return baseStorageAdapter.getInterval();
}
+ @Override
+ public RowSignature getRowSignature()
+ {
+ return baseStorageAdapter.getRowSignature();
+ }
+
@Override
public Indexed<String> getAvailableDimensions()
{
diff --git
a/processing/src/main/java/org/apache/druid/segment/QueryableIndexStorageAdapter.java
b/processing/src/main/java/org/apache/druid/segment/QueryableIndexStorageAdapter.java
index e2c88071607..a0e23e71f74 100644
---
a/processing/src/main/java/org/apache/druid/segment/QueryableIndexStorageAdapter.java
+++
b/processing/src/main/java/org/apache/druid/segment/QueryableIndexStorageAdapter.java
@@ -19,11 +19,14 @@
package org.apache.druid.segment;
+import org.apache.druid.query.OrderBy;
import org.apache.druid.segment.column.BaseColumn;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnIndexSupplier;
+import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.DictionaryEncodedColumn;
+import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.data.Indexed;
import
org.apache.druid.segment.index.semantic.DictionaryEncodedStringValueIndex;
import org.joda.time.DateTime;
@@ -55,6 +58,40 @@ public class QueryableIndexStorageAdapter implements
StorageAdapter
return index.getDataInterval();
}
+ @Override
+ public RowSignature getRowSignature()
+ {
+ final LinkedHashSet<String> columns = new LinkedHashSet<>();
+
+ for (final OrderBy orderBy : index.getOrdering()) {
+ columns.add(orderBy.getColumnName());
+ }
+
+ // Add __time after the defined ordering, if __time wasn't part of it.
+ columns.add(ColumnHolder.TIME_COLUMN_NAME);
+
+ for (final String dimension : getAvailableDimensions()) {
+ columns.add(dimension);
+ }
+
+ for (final String metric : getAvailableMetrics()) {
+ columns.add(metric);
+ }
+
+ final RowSignature.Builder builder = RowSignature.builder();
+ for (final String column : columns) {
+ final ColumnType columnType =
ColumnType.fromCapabilities(index.getColumnCapabilities(column));
+
+ // index.getOrdering() may include columns that don't exist, such as if
they were omitted due to
+ // being 100% nulls. Don't add those to the row signature.
+ if (columnType != null) {
+ builder.add(column, columnType);
+ }
+ }
+
+ return builder.build();
+ }
+
@Override
public Indexed<String> getAvailableDimensions()
{
diff --git
a/processing/src/main/java/org/apache/druid/segment/StorageAdapter.java
b/processing/src/main/java/org/apache/druid/segment/StorageAdapter.java
index 3ad6c930002..c0949692e4f 100644
--- a/processing/src/main/java/org/apache/druid/segment/StorageAdapter.java
+++ b/processing/src/main/java/org/apache/druid/segment/StorageAdapter.java
@@ -26,6 +26,7 @@ import
org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.OrderBy;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.vector.VectorCursor;
@@ -34,7 +35,6 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.List;
-import java.util.Optional;
/**
*
@@ -140,10 +140,7 @@ public interface StorageAdapter extends CursorFactory,
ColumnInspector, CursorHo
builder.addTimeColumn();
for (final String column : Iterables.concat(getAvailableDimensions(),
getAvailableMetrics())) {
- builder.add(
- column,
-
Optional.ofNullable(getColumnCapabilities(column)).map(ColumnCapabilities::toColumnType).orElse(null)
- );
+ builder.add(column,
ColumnType.fromCapabilities(getColumnCapabilities(column)));
}
return builder.build();
diff --git
a/processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java
b/processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java
index 435771b4780..2f9552a1d3c 100644
---
a/processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java
+++
b/processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java
@@ -35,6 +35,8 @@ import org.apache.druid.query.filter.NullFilter;
import org.apache.druid.query.filter.RangeFilter;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.TypeSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.Indexed;
@@ -169,6 +171,22 @@ public class UnnestStorageAdapter implements StorageAdapter
return baseAdapter.getInterval();
}
+ @Override
+ public RowSignature getRowSignature()
+ {
+ final RowSignature.Builder builder = RowSignature.builder();
+
+ final RowSignature baseSignature = baseAdapter.getRowSignature();
+ for (int i = 0; i < baseSignature.size(); i++) {
+ final String column = baseSignature.getColumnName(i);
+ if (!outputColumnName.equals(column)) {
+ builder.add(column,
ColumnType.fromCapabilities(getColumnCapabilities(column)));
+ }
+ }
+
+ return builder.add(outputColumnName,
ColumnType.fromCapabilities(getColumnCapabilities(outputColumnName))).build();
+ }
+
@Override
public Indexed<String> getAvailableDimensions()
{
diff --git
a/processing/src/main/java/org/apache/druid/segment/column/ColumnType.java
b/processing/src/main/java/org/apache/druid/segment/column/ColumnType.java
index dbfed07749c..b670d8f4370 100644
--- a/processing/src/main/java/org/apache/druid/segment/column/ColumnType.java
+++ b/processing/src/main/java/org/apache/druid/segment/column/ColumnType.java
@@ -124,6 +124,12 @@ public class ColumnType extends
BaseTypeSignature<ValueType>
return Types.fromString(ColumnTypeFactory.getInstance(), typeName);
}
+ @Nullable
+ public static ColumnType fromCapabilities(@Nullable ColumnCapabilities
capabilities)
+ {
+ return capabilities != null ? capabilities.toColumnType() : null;
+ }
+
public static ColumnType ofArray(ColumnType elementType)
{
return ColumnTypeFactory.getInstance().ofArray(elementType);
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java
index ab92a00897b..c9a6d209697 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java
@@ -19,6 +19,7 @@
package org.apache.druid.segment.incremental;
+import com.google.common.collect.Iterables;
import org.apache.druid.segment.CursorBuildSpec;
import org.apache.druid.segment.CursorHolder;
import org.apache.druid.segment.DimensionDictionarySelector;
@@ -30,6 +31,7 @@ import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.data.ListIndexed;
import org.joda.time.Interval;
@@ -122,6 +124,18 @@ public class IncrementalIndexStorageAdapter implements
StorageAdapter
return index.getInterval();
}
+ @Override
+ public RowSignature getRowSignature()
+ {
+ final RowSignature.Builder builder = RowSignature.builder();
+
+ for (final String column : Iterables.concat(index.getDimensionNames(true),
index.getMetricNames())) {
+ builder.add(column,
ColumnType.fromCapabilities(index.getColumnCapabilities(column)));
+ }
+
+ return builder.build();
+ }
+
@Override
public Indexed<String> getAvailableDimensions()
{
diff --git
a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapter.java
b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapter.java
index f3668ca45e1..765ab4fbd91 100644
---
a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapter.java
+++
b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapter.java
@@ -33,6 +33,8 @@ import org.apache.druid.segment.Metadata;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.data.ListIndexed;
import org.apache.druid.segment.filter.Filters;
@@ -99,6 +101,24 @@ public class HashJoinSegmentStorageAdapter implements
StorageAdapter
return baseAdapter.getInterval();
}
+ @Override
+ public RowSignature getRowSignature()
+ {
+ // Use a Set since we may encounter duplicates, if a field from a Joinable
shadows one of the base fields.
+ final LinkedHashSet<String> columns = new
LinkedHashSet<>(baseAdapter.getRowSignature().getColumnNames());
+
+ for (final JoinableClause clause : clauses) {
+ columns.addAll(clause.getAvailableColumnsPrefixed());
+ }
+
+ final RowSignature.Builder builder = RowSignature.builder();
+ for (final String column : columns) {
+ builder.add(column,
ColumnType.fromCapabilities(getColumnCapabilities(column)));
+ }
+
+ return builder.build();
+ }
+
@Override
public Indexed<String> getAvailableDimensions()
{
diff --git
a/processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java
b/processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java
index 9591dc315f0..5fe6d8a698a 100644
---
a/processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java
+++
b/processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java
@@ -39,7 +39,6 @@ import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.SimpleAscendingOffset;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.BaseColumn;
-import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.data.ReadableOffset;
@@ -88,12 +87,7 @@ public class BroadcastSegmentIndexedTable implements
IndexedTable
segment.getId()
);
- RowSignature.Builder sigBuilder = RowSignature.builder();
- sigBuilder.add(ColumnHolder.TIME_COLUMN_NAME, ColumnType.LONG);
- for (String column : queryableIndex.getColumnNames()) {
- sigBuilder.add(column,
adapter.getColumnCapabilities(column).toColumnType());
- }
- this.rowSignature = sigBuilder.build();
+ this.rowSignature = adapter.getRowSignature();
// initialize keycolumn index builders
final ArrayList<RowBasedIndexBuilder> indexBuilders = new
ArrayList<>(rowSignature.size());
diff --git
a/processing/src/test/java/org/apache/druid/frame/key/KeyTestUtils.java
b/processing/src/test/java/org/apache/druid/frame/key/KeyTestUtils.java
index e5ab3c203f4..cae9439a331 100644
--- a/processing/src/test/java/org/apache/druid/frame/key/KeyTestUtils.java
+++ b/processing/src/test/java/org/apache/druid/frame/key/KeyTestUtils.java
@@ -30,13 +30,11 @@ import org.apache.druid.frame.write.FrameWriters;
import org.apache.druid.frame.write.RowBasedFrameWriter;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.RowBasedColumnSelectorFactory;
-import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import java.util.Collections;
import java.util.List;
-import java.util.Optional;
public class KeyTestUtils
{
@@ -56,10 +54,10 @@ public class KeyTestUtils
final RowSignature.Builder builder = RowSignature.builder();
for (final KeyColumn keyColumn : keyColumns) {
- final ColumnCapabilities capabilities =
inspector.getColumnCapabilities(keyColumn.columnName());
- final ColumnType columnType =
-
Optional.ofNullable(capabilities).map(ColumnCapabilities::toColumnType).orElse(null);
- builder.add(keyColumn.columnName(), columnType);
+ builder.add(
+ keyColumn.columnName(),
+
ColumnType.fromCapabilities(inspector.getColumnCapabilities(keyColumn.columnName()))
+ );
}
return builder.build();
diff --git
a/processing/src/test/java/org/apache/druid/query/lookup/LookupSegmentTest.java
b/processing/src/test/java/org/apache/druid/query/lookup/LookupSegmentTest.java
index 5826301bcd7..142b23b8391 100644
---
a/processing/src/test/java/org/apache/druid/query/lookup/LookupSegmentTest.java
+++
b/processing/src/test/java/org/apache/druid/query/lookup/LookupSegmentTest.java
@@ -32,6 +32,8 @@ import org.apache.druid.segment.CursorHolder;
import org.apache.druid.segment.DimensionDictionarySelector;
import org.apache.druid.segment.RowBasedStorageAdapter;
import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.timeline.SegmentId;
import org.hamcrest.CoreMatchers;
@@ -119,6 +121,18 @@ public class LookupSegmentTest
Assert.assertNull(LOOKUP_SEGMENT.asQueryableIndex());
}
+ @Test
+ public void test_asStorageAdapter_getRowSignature()
+ {
+ Assert.assertEquals(
+ RowSignature.builder()
+ .add("k", ColumnType.STRING)
+ .add("v", ColumnType.STRING)
+ .build(),
+ LOOKUP_SEGMENT.asStorageAdapter().getRowSignature()
+ );
+ }
+
@Test
public void test_asStorageAdapter_getAvailableDimensions()
{
diff --git
a/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
b/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
index 1f46534266f..b8c9721b5a0 100644
--- a/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
+++ b/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
@@ -255,12 +255,12 @@ public class IndexMergerTestBase extends
InitializedNullHandlingTest
Assert.assertEquals(6, index.getNumRows());
Assert.assertEquals(
ImmutableList.of(
- ImmutableList.of(timestamp, "1", "2", 1L),
- ImmutableList.of(timestamp, "1", "2", 1L),
- ImmutableList.of(timestamp + 1, "1", "2", 1L),
- ImmutableList.of(timestamp, "3", "4", 1L),
- ImmutableList.of(timestamp, "3", "4", 1L),
- ImmutableList.of(timestamp + 1, "3", "4", 1L)
+ ImmutableList.of("1", "2", timestamp, 1L),
+ ImmutableList.of("1", "2", timestamp, 1L),
+ ImmutableList.of("1", "2", timestamp + 1, 1L),
+ ImmutableList.of("3", "4", timestamp, 1L),
+ ImmutableList.of("3", "4", timestamp, 1L),
+ ImmutableList.of("3", "4", timestamp + 1, 1L)
),
FrameTestUtil.readRowsFromAdapter(new
QueryableIndexStorageAdapter(index), null, false).toList()
);
@@ -325,10 +325,10 @@ public class IndexMergerTestBase extends
InitializedNullHandlingTest
Assert.assertEquals(4, index.getNumRows());
Assert.assertEquals(
ImmutableList.of(
- ImmutableList.of(timestamp, "1", "2", 2L),
- ImmutableList.of(timestamp + 1, "1", "2", 1L),
- ImmutableList.of(timestamp, "3", "4", 2L),
- ImmutableList.of(timestamp + 1, "3", "4", 1L)
+ ImmutableList.of("1", "2", timestamp, 2L),
+ ImmutableList.of("1", "2", timestamp + 1, 1L),
+ ImmutableList.of("3", "4", timestamp, 2L),
+ ImmutableList.of("3", "4", timestamp + 1, 1L)
),
FrameTestUtil.readRowsFromAdapter(new
QueryableIndexStorageAdapter(index), null, false).toList()
);
diff --git
a/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapterTest.java
b/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapterTest.java
index b26da1ceba7..7ae04d9fcb1 100644
---
a/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapterTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapterTest.java
@@ -58,6 +58,31 @@ public class HashJoinSegmentStorageAdapterTest extends
BaseHashJoinSegmentStorag
);
}
+ @Test
+ public void test_getRowSignature_factToCountry()
+ {
+ Assert.assertEquals(
+ ImmutableList.of(
+ "__time",
+ "channel",
+ "regionIsoCode",
+ "countryNumber",
+ "countryIsoCode",
+ "user",
+ "isRobot",
+ "isAnonymous",
+ "namespace",
+ "page",
+ "delta",
+ "channel_uniques",
+ "c1.countryNumber",
+ "c1.countryIsoCode",
+ "c1.countryName"
+ ),
+
Lists.newArrayList(makeFactToCountrySegment().getRowSignature().getColumnNames())
+ );
+ }
+
@Test
public void test_getAvailableDimensions_factToCountry()
{
diff --git
a/processing/src/test/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTableTest.java
b/processing/src/test/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTableTest.java
index 73daadc7dd1..9b8f7f4d916 100644
---
a/processing/src/test/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTableTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTableTest.java
@@ -43,6 +43,7 @@ import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndexSegment;
+import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.SimpleAscendingOffset;
import org.apache.druid.segment.TestIndex;
@@ -141,9 +142,8 @@ public class BroadcastSegmentIndexedTableTest extends
InitializedNullHandlingTes
segment.getTotalSpace()
);
backingSegment = (QueryableIndexSegment) factory.factorize(dataSegment,
segment, false, SegmentLazyLoadFailCallback.NOOP);
-
- columnNames =
ImmutableList.<String>builder().add(ColumnHolder.TIME_COLUMN_NAME)
-
.addAll(backingSegment.asQueryableIndex().getColumnNames()).build();
+ columnNames =
+ new
QueryableIndexStorageAdapter(backingSegment.asQueryableIndex()).getRowSignature().getColumnNames();
broadcastTable = new BroadcastSegmentIndexedTable(backingSegment,
keyColumns, dataSegment.getVersion());
}
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/sink/Sink.java
b/server/src/main/java/org/apache/druid/segment/realtime/sink/Sink.java
index 8d4fc0dd7f8..5e59c0e9b15 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/sink/Sink.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/sink/Sink.java
@@ -24,6 +24,7 @@ import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
@@ -37,7 +38,6 @@ import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.column.ColumnFormat;
-import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
@@ -92,6 +92,7 @@ public class Sink implements Iterable<FireHydrant>,
Overshadowable<Sink>
private final LinkedHashSet<String> dimOrder = new LinkedHashSet<>();
// columns excluding current index (the in-memory fire hydrant), includes
__time column
+ @GuardedBy("hydrantLock")
private final LinkedHashSet<String> columnsExcludingCurrIndex = new
LinkedHashSet<>();
// column types for columns in {@code columnsExcludingCurrIndex}
@@ -397,6 +398,7 @@ public class Sink implements Iterable<FireHydrant>,
Overshadowable<Sink>
/**
* Merge the column from the index with the existing columns.
*/
+ @GuardedBy("hydrantLock")
private void overwriteIndexDimensions(StorageAdapter storageAdapter)
{
RowSignature rowSignature = storageAdapter.getRowSignature();
@@ -414,20 +416,19 @@ public class Sink implements Iterable<FireHydrant>,
Overshadowable<Sink>
synchronized (hydrantLock) {
RowSignature.Builder builder = RowSignature.builder();
- builder.addTimeColumn();
-
+ // Add columns from columnsExcludingCurrIndex.
for (String dim : columnsExcludingCurrIndex) {
- if (!ColumnHolder.TIME_COLUMN_NAME.equals(dim)) {
- builder.add(dim, columnTypeExcludingCurrIndex.get(dim));
- }
+ builder.add(dim, columnTypeExcludingCurrIndex.get(dim));
}
- IncrementalIndexStorageAdapter incrementalIndexStorageAdapter = new
IncrementalIndexStorageAdapter(currHydrant.getIndex());
- RowSignature incrementalIndexSignature =
incrementalIndexStorageAdapter.getRowSignature();
+ // Add columns from the currHydrant that do not yet exist in
columnsExcludingCurrIndex.
+ IncrementalIndexStorageAdapter currStorageAdapter =
+ new IncrementalIndexStorageAdapter(currHydrant.getIndex());
+ RowSignature currSignature = currStorageAdapter.getRowSignature();
- for (String dim : incrementalIndexSignature.getColumnNames()) {
- if (!columnsExcludingCurrIndex.contains(dim) &&
!ColumnHolder.TIME_COLUMN_NAME.equals(dim)) {
- builder.add(dim,
incrementalIndexSignature.getColumnType(dim).orElse(null));
+ for (String dim : currSignature.getColumnNames()) {
+ if (!columnsExcludingCurrIndex.contains(dim)) {
+ builder.add(dim, currSignature.getColumnType(dim).orElse(null));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]