This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d2ed33b897 Place __time in signatures according to sort order. 
(#16958)
5d2ed33b897 is described below

commit 5d2ed33b8975405916aba91fb91c8b408562de94
Author: Gian Merlino <[email protected]>
AuthorDate: Mon Aug 26 21:45:51 2024 -0700

    Place __time in signatures according to sort order. (#16958)
    
    * Place __time in signatures according to sort order.
    
    Updates a variety of places to put __time in row signatures according
    to its position in the sort order, rather than always first, including:
    
    - InputSourceSampler.
    - ScanQueryEngine (in the default signature when "columns" is empty).
    - Various StorageAdapters, which also have the effect of reordering
      the column order in segmentMetadata queries, and therefore in SQL
      schemas as well.
    
    Follow-up to #16849.
    
    * Fix compilation.
    
    * Additional fixes.
    
    * Fix.
    
    * Fix style.
    
    * Omit nonexistent columns from the row signature.
    
    * Fix tests.
---
 .../org/apache/druid/msq/exec/MSQReplaceTest.java  | 28 ++++++++--------
 .../org/apache/druid/msq/test/MSQTestBase.java     | 22 +++++++------
 .../overlord/sampler/InputSourceSampler.java       | 16 ++++------
 .../apache/druid/query/scan/ScanQueryEngine.java   | 13 +++-----
 .../druid/segment/FilteredStorageAdapter.java      |  7 ++++
 .../segment/QueryableIndexStorageAdapter.java      | 37 ++++++++++++++++++++++
 .../org/apache/druid/segment/StorageAdapter.java   |  7 ++--
 .../apache/druid/segment/UnnestStorageAdapter.java | 18 +++++++++++
 .../apache/druid/segment/column/ColumnType.java    |  6 ++++
 .../IncrementalIndexStorageAdapter.java            | 14 ++++++++
 .../join/HashJoinSegmentStorageAdapter.java        | 20 ++++++++++++
 .../join/table/BroadcastSegmentIndexedTable.java   |  8 +----
 .../org/apache/druid/frame/key/KeyTestUtils.java   | 10 +++---
 .../druid/query/lookup/LookupSegmentTest.java      | 14 ++++++++
 .../apache/druid/segment/IndexMergerTestBase.java  | 20 ++++++------
 .../join/HashJoinSegmentStorageAdapterTest.java    | 25 +++++++++++++++
 .../table/BroadcastSegmentIndexedTableTest.java    |  6 ++--
 .../apache/druid/segment/realtime/sink/Sink.java   | 23 +++++++-------
 18 files changed, 211 insertions(+), 83 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
index 96da93c34d0..e470a951877 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
@@ -279,8 +279,8 @@ public class MSQReplaceTest extends MSQTestBase
   public void testReplaceOnFooWithAllClusteredByDimExplicitSort(String 
contextName, Map<String, Object> context)
   {
     RowSignature rowSignature = RowSignature.builder()
-                                            .add("__time", ColumnType.LONG)
                                             .add("dim1", ColumnType.STRING)
+                                            .add("__time", ColumnType.LONG)
                                             .add("m1", ColumnType.FLOAT)
                                             .build();
 
@@ -323,12 +323,12 @@ public class MSQReplaceTest extends MSQTestBase
                      .setExpectedShardSpec(DimensionRangeShardSpec.class)
                      .setExpectedResultRows(
                          ImmutableList.of(
-                             new Object[]{946684800000L, 
NullHandling.sqlCompatible() ? "" : null, 1.0f},
-                             new Object[]{978307200000L, "1", 4.0f},
-                             new Object[]{946771200000L, "10.1", 2.0f},
-                             new Object[]{946857600000L, "2", 3.0f},
-                             new Object[]{978480000000L, "abc", 6.0f},
-                             new Object[]{978393600000L, "def", 5.0f}
+                             new Object[]{NullHandling.sqlCompatible() ? "" : 
null, 946684800000L, 1.0f},
+                             new Object[]{"1", 978307200000L, 4.0f},
+                             new Object[]{"10.1", 946771200000L, 2.0f},
+                             new Object[]{"2", 946857600000L, 3.0f},
+                             new Object[]{"abc", 978480000000L, 6.0f},
+                             new Object[]{"def", 978393600000L, 5.0f}
                          )
                      )
                      
.setExpectedSegmentGenerationProgressCountersForStageWorker(
@@ -365,8 +365,8 @@ public class MSQReplaceTest extends MSQTestBase
     // forceSegmentSortByTime = false. (Same expectations as the prior test,
     // testReplaceOnFooWithAllClusteredByDimExplicitSort.)
     RowSignature rowSignature = RowSignature.builder()
-                                            .add("__time", ColumnType.LONG)
                                             .add("dim1", ColumnType.STRING)
+                                            .add("__time", ColumnType.LONG)
                                             .add("m1", ColumnType.FLOAT)
                                             .build();
 
@@ -409,12 +409,12 @@ public class MSQReplaceTest extends MSQTestBase
                      .setExpectedShardSpec(DimensionRangeShardSpec.class)
                      .setExpectedResultRows(
                          ImmutableList.of(
-                             new Object[]{946684800000L, 
NullHandling.sqlCompatible() ? "" : null, 1.0f},
-                             new Object[]{978307200000L, "1", 4.0f},
-                             new Object[]{946771200000L, "10.1", 2.0f},
-                             new Object[]{946857600000L, "2", 3.0f},
-                             new Object[]{978480000000L, "abc", 6.0f},
-                             new Object[]{978393600000L, "def", 5.0f}
+                             new Object[]{NullHandling.sqlCompatible() ? "" : 
null, 946684800000L, 1.0f},
+                             new Object[]{"1", 978307200000L, 4.0f},
+                             new Object[]{"10.1", 946771200000L, 2.0f},
+                             new Object[]{"2", 946857600000L, 3.0f},
+                             new Object[]{"abc", 978480000000L, 6.0f},
+                             new Object[]{"def", 978393600000L, 5.0f}
                          )
                      )
                      
.setExpectedSegmentGenerationProgressCountersForStageWorker(
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index 5076e9f3d44..258b5c97751 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -141,6 +141,7 @@ import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.StorageAdapter;
 import org.apache.druid.segment.TestIndex;
 import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.loading.DataSegmentPusher;
@@ -1288,17 +1289,20 @@ public class MSQTestBase extends BaseCalciteQueryTest
           Assert.assertEquals(expectedDestinationIntervals, 
destination.getReplaceTimeChunks());
         }
         if (expectedSegments != null) {
+          final int timeIndex =
+              
MSQResultsReport.ColumnAndType.toRowSignature(expectedRowSignature)
+                                            
.indexOf(ColumnHolder.TIME_COLUMN_NAME);
           Assert.assertEquals(expectedSegments, 
segmentIdVsOutputRowsMap.keySet());
           for (Object[] row : transformedOutputRows) {
-            List<SegmentId> diskSegmentList = segmentIdVsOutputRowsMap.keySet()
-                                                                      .stream()
-                                                                      
.filter(segmentId -> segmentId.getInterval()
-                                                                               
                     .contains((Long) row[0]))
-                                                                      
.filter(segmentId -> {
-                                                                        
List<List<Object>> lists = segmentIdVsOutputRowsMap.get(segmentId);
-                                                                        return 
lists.contains(Arrays.asList(row));
-                                                                      })
-                                                                      
.collect(Collectors.toList());
+            List<SegmentId> diskSegmentList = segmentIdVsOutputRowsMap
+                .keySet()
+                .stream()
+                .filter(segmentId -> segmentId.getInterval().contains((Long) 
row[timeIndex]))
+                .filter(segmentId -> {
+                  List<List<Object>> lists = 
segmentIdVsOutputRowsMap.get(segmentId);
+                  return lists.contains(Arrays.asList(row));
+                })
+                .collect(Collectors.toList());
             if (diskSegmentList.size() != 1) {
               throw new IllegalStateException("Single key in multiple 
partitions");
             }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
index 4e5e8634f18..f98287124ed 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
@@ -241,14 +241,12 @@ public class InputSourceSampler
         List<DimensionSchema> physicalDimensionSchemas = new ArrayList<>();
 
         RowSignature.Builder signatureBuilder = RowSignature.builder();
-        signatureBuilder.add(
-            ColumnHolder.TIME_COLUMN_NAME,
-            
index.getColumnCapabilities(ColumnHolder.TIME_COLUMN_NAME).toColumnType()
-        );
-        for (IncrementalIndex.DimensionDesc dimensionDesc : 
index.getDimensions()) {
-          if 
(!SamplerInputRow.SAMPLER_ORDERING_COLUMN.equals(dimensionDesc.getName())) {
-            final ColumnType columnType = 
dimensionDesc.getCapabilities().toColumnType();
-            signatureBuilder.add(dimensionDesc.getName(), columnType);
+        for (final String dimensionName : index.getDimensionNames(true)) {
+          if (ColumnHolder.TIME_COLUMN_NAME.equals(dimensionName)) {
+            signatureBuilder.addTimeColumn();
+          } else if 
(!SamplerInputRow.SAMPLER_ORDERING_COLUMN.equals(dimensionName)) {
+            final IncrementalIndex.DimensionDesc dimensionDesc = 
index.getDimension(dimensionName);
+            signatureBuilder.add(dimensionDesc.getName(), 
ColumnType.fromCapabilities(dimensionDesc.getCapabilities()));
             // use explicitly specified dimension schema if it exists
             if (dataSchema != null &&
                 dataSchema.getDimensionsSpec() != null &&
@@ -271,7 +269,7 @@ public class InputSourceSampler
           if 
(!SamplerInputRow.SAMPLER_ORDERING_COLUMN.equals(aggregatorFactory.getName())) {
             signatureBuilder.add(
                 aggregatorFactory.getName(),
-                
index.getColumnCapabilities(aggregatorFactory.getName()).toColumnType()
+                
ColumnType.fromCapabilities(index.getColumnCapabilities(aggregatorFactory.getName()))
             );
           }
         }
diff --git 
a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java 
b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java
index ca2c2725c6f..9dd72d15fa4 100644
--- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java
+++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java
@@ -45,6 +45,7 @@ import org.apache.druid.segment.StorageAdapter;
 import org.apache.druid.segment.VirtualColumn;
 import org.apache.druid.segment.column.ColumnCapabilities;
 import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.filter.Filters;
 import org.apache.druid.timeline.SegmentId;
@@ -98,13 +99,11 @@ public class ScanQueryEngine
     } else {
       final Set<String> availableColumns = Sets.newLinkedHashSet(
           Iterables.concat(
-              Collections.singleton(ColumnHolder.TIME_COLUMN_NAME),
+              adapter.getRowSignature().getColumnNames(),
               Iterables.transform(
                   Arrays.asList(query.getVirtualColumns().getVirtualColumns()),
                   VirtualColumn::getOutputName
-              ),
-              adapter.getAvailableDimensions(),
-              adapter.getAvailableMetrics()
+              )
           )
       );
 
@@ -152,11 +151,7 @@ public class ScanQueryEngine
             for (String column : allColumns) {
               final BaseObjectColumnValueSelector selector = 
factory.makeColumnValueSelector(column);
               ColumnCapabilities columnCapabilities = 
factory.getColumnCapabilities(column);
-              rowSignatureBuilder.add(
-                  column,
-                  columnCapabilities == null ? null : 
columnCapabilities.toColumnType()
-              );
-
+              rowSignatureBuilder.add(column, 
ColumnType.fromCapabilities(columnCapabilities));
               columnSelectors.add(selector);
             }
 
diff --git 
a/processing/src/main/java/org/apache/druid/segment/FilteredStorageAdapter.java 
b/processing/src/main/java/org/apache/druid/segment/FilteredStorageAdapter.java
index 3fcfe2dd78a..46b9f5fe061 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/FilteredStorageAdapter.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/FilteredStorageAdapter.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
 import org.apache.druid.query.filter.DimFilter;
 import org.apache.druid.query.filter.Filter;
 import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.data.Indexed;
 import org.apache.druid.segment.filter.AndFilter;
 import org.joda.time.Interval;
@@ -69,6 +70,12 @@ public class FilteredStorageAdapter implements StorageAdapter
     return baseStorageAdapter.getInterval();
   }
 
+  @Override
+  public RowSignature getRowSignature()
+  {
+    return baseStorageAdapter.getRowSignature();
+  }
+
   @Override
   public Indexed<String> getAvailableDimensions()
   {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/QueryableIndexStorageAdapter.java
 
b/processing/src/main/java/org/apache/druid/segment/QueryableIndexStorageAdapter.java
index e2c88071607..a0e23e71f74 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/QueryableIndexStorageAdapter.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/QueryableIndexStorageAdapter.java
@@ -19,11 +19,14 @@
 
 package org.apache.druid.segment;
 
+import org.apache.druid.query.OrderBy;
 import org.apache.druid.segment.column.BaseColumn;
 import org.apache.druid.segment.column.ColumnCapabilities;
 import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.column.ColumnIndexSupplier;
+import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.DictionaryEncodedColumn;
+import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.data.Indexed;
 import 
org.apache.druid.segment.index.semantic.DictionaryEncodedStringValueIndex;
 import org.joda.time.DateTime;
@@ -55,6 +58,40 @@ public class QueryableIndexStorageAdapter implements 
StorageAdapter
     return index.getDataInterval();
   }
 
+  @Override
+  public RowSignature getRowSignature()
+  {
+    final LinkedHashSet<String> columns = new LinkedHashSet<>();
+
+    for (final OrderBy orderBy : index.getOrdering()) {
+      columns.add(orderBy.getColumnName());
+    }
+
+    // Add __time after the defined ordering, if __time wasn't part of it.
+    columns.add(ColumnHolder.TIME_COLUMN_NAME);
+
+    for (final String dimension : getAvailableDimensions()) {
+      columns.add(dimension);
+    }
+
+    for (final String metric : getAvailableMetrics()) {
+      columns.add(metric);
+    }
+
+    final RowSignature.Builder builder = RowSignature.builder();
+    for (final String column : columns) {
+      final ColumnType columnType = 
ColumnType.fromCapabilities(index.getColumnCapabilities(column));
+
+      // index.getOrdering() may include columns that don't exist, such as if 
they were omitted due to
+      // being 100% nulls. Don't add those to the row signature.
+      if (columnType != null) {
+        builder.add(column, columnType);
+      }
+    }
+
+    return builder.build();
+  }
+
   @Override
   public Indexed<String> getAvailableDimensions()
   {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/StorageAdapter.java 
b/processing/src/main/java/org/apache/druid/segment/StorageAdapter.java
index 3ad6c930002..c0949692e4f 100644
--- a/processing/src/main/java/org/apache/druid/segment/StorageAdapter.java
+++ b/processing/src/main/java/org/apache/druid/segment/StorageAdapter.java
@@ -26,6 +26,7 @@ import 
org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.query.OrderBy;
 import org.apache.druid.segment.column.ColumnCapabilities;
 import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.data.Indexed;
 import org.apache.druid.segment.vector.VectorCursor;
@@ -34,7 +35,6 @@ import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
 import java.util.List;
-import java.util.Optional;
 
 /**
  *
@@ -140,10 +140,7 @@ public interface StorageAdapter extends CursorFactory, 
ColumnInspector, CursorHo
     builder.addTimeColumn();
 
     for (final String column : Iterables.concat(getAvailableDimensions(), 
getAvailableMetrics())) {
-      builder.add(
-          column,
-          
Optional.ofNullable(getColumnCapabilities(column)).map(ColumnCapabilities::toColumnType).orElse(null)
-      );
+      builder.add(column, 
ColumnType.fromCapabilities(getColumnCapabilities(column)));
     }
 
     return builder.build();
diff --git 
a/processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java 
b/processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java
index 435771b4780..2f9552a1d3c 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java
@@ -35,6 +35,8 @@ import org.apache.druid.query.filter.NullFilter;
 import org.apache.druid.query.filter.RangeFilter;
 import org.apache.druid.segment.column.ColumnCapabilities;
 import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.column.TypeSignature;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.data.Indexed;
@@ -169,6 +171,22 @@ public class UnnestStorageAdapter implements StorageAdapter
     return baseAdapter.getInterval();
   }
 
+  @Override
+  public RowSignature getRowSignature()
+  {
+    final RowSignature.Builder builder = RowSignature.builder();
+
+    final RowSignature baseSignature = baseAdapter.getRowSignature();
+    for (int i = 0; i < baseSignature.size(); i++) {
+      final String column = baseSignature.getColumnName(i);
+      if (!outputColumnName.equals(column)) {
+        builder.add(column, 
ColumnType.fromCapabilities(getColumnCapabilities(column)));
+      }
+    }
+
+    return builder.add(outputColumnName, 
ColumnType.fromCapabilities(getColumnCapabilities(outputColumnName))).build();
+  }
+
   @Override
   public Indexed<String> getAvailableDimensions()
   {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/column/ColumnType.java 
b/processing/src/main/java/org/apache/druid/segment/column/ColumnType.java
index dbfed07749c..b670d8f4370 100644
--- a/processing/src/main/java/org/apache/druid/segment/column/ColumnType.java
+++ b/processing/src/main/java/org/apache/druid/segment/column/ColumnType.java
@@ -124,6 +124,12 @@ public class ColumnType extends 
BaseTypeSignature<ValueType>
     return Types.fromString(ColumnTypeFactory.getInstance(), typeName);
   }
 
+  @Nullable
+  public static ColumnType fromCapabilities(@Nullable ColumnCapabilities 
capabilities)
+  {
+    return capabilities != null ? capabilities.toColumnType() : null;
+  }
+
   public static ColumnType ofArray(ColumnType elementType)
   {
     return ColumnTypeFactory.getInstance().ofArray(elementType);
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java
index ab92a00897b..c9a6d209697 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.segment.incremental;
 
+import com.google.common.collect.Iterables;
 import org.apache.druid.segment.CursorBuildSpec;
 import org.apache.druid.segment.CursorHolder;
 import org.apache.druid.segment.DimensionDictionarySelector;
@@ -30,6 +31,7 @@ import org.apache.druid.segment.column.ColumnCapabilities;
 import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
 import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.data.Indexed;
 import org.apache.druid.segment.data.ListIndexed;
 import org.joda.time.Interval;
@@ -122,6 +124,18 @@ public class IncrementalIndexStorageAdapter implements 
StorageAdapter
     return index.getInterval();
   }
 
+  @Override
+  public RowSignature getRowSignature()
+  {
+    final RowSignature.Builder builder = RowSignature.builder();
+
+    for (final String column : Iterables.concat(index.getDimensionNames(true), 
index.getMetricNames())) {
+      builder.add(column, 
ColumnType.fromCapabilities(index.getColumnCapabilities(column)));
+    }
+
+    return builder.build();
+  }
+
   @Override
   public Indexed<String> getAvailableDimensions()
   {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapter.java
 
b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapter.java
index f3668ca45e1..765ab4fbd91 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapter.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapter.java
@@ -33,6 +33,8 @@ import org.apache.druid.segment.Metadata;
 import org.apache.druid.segment.StorageAdapter;
 import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.data.Indexed;
 import org.apache.druid.segment.data.ListIndexed;
 import org.apache.druid.segment.filter.Filters;
@@ -99,6 +101,24 @@ public class HashJoinSegmentStorageAdapter implements 
StorageAdapter
     return baseAdapter.getInterval();
   }
 
+  @Override
+  public RowSignature getRowSignature()
+  {
+    // Use a Set since we may encounter duplicates, if a field from a Joinable 
shadows one of the base fields.
+    final LinkedHashSet<String> columns = new 
LinkedHashSet<>(baseAdapter.getRowSignature().getColumnNames());
+
+    for (final JoinableClause clause : clauses) {
+      columns.addAll(clause.getAvailableColumnsPrefixed());
+    }
+
+    final RowSignature.Builder builder = RowSignature.builder();
+    for (final String column : columns) {
+      builder.add(column, 
ColumnType.fromCapabilities(getColumnCapabilities(column)));
+    }
+
+    return builder.build();
+  }
+
   @Override
   public Indexed<String> getAvailableDimensions()
   {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java
 
b/processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java
index 9591dc315f0..5fe6d8a698a 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java
@@ -39,7 +39,6 @@ import org.apache.druid.segment.QueryableIndexStorageAdapter;
 import org.apache.druid.segment.SimpleAscendingOffset;
 import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.column.BaseColumn;
-import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.data.ReadableOffset;
@@ -88,12 +87,7 @@ public class BroadcastSegmentIndexedTable implements 
IndexedTable
         segment.getId()
     );
 
-    RowSignature.Builder sigBuilder = RowSignature.builder();
-    sigBuilder.add(ColumnHolder.TIME_COLUMN_NAME, ColumnType.LONG);
-    for (String column : queryableIndex.getColumnNames()) {
-      sigBuilder.add(column, 
adapter.getColumnCapabilities(column).toColumnType());
-    }
-    this.rowSignature = sigBuilder.build();
+    this.rowSignature = adapter.getRowSignature();
 
     // initialize keycolumn index builders
     final ArrayList<RowBasedIndexBuilder> indexBuilders = new 
ArrayList<>(rowSignature.size());
diff --git 
a/processing/src/test/java/org/apache/druid/frame/key/KeyTestUtils.java 
b/processing/src/test/java/org/apache/druid/frame/key/KeyTestUtils.java
index e5ab3c203f4..cae9439a331 100644
--- a/processing/src/test/java/org/apache/druid/frame/key/KeyTestUtils.java
+++ b/processing/src/test/java/org/apache/druid/frame/key/KeyTestUtils.java
@@ -30,13 +30,11 @@ import org.apache.druid.frame.write.FrameWriters;
 import org.apache.druid.frame.write.RowBasedFrameWriter;
 import org.apache.druid.segment.ColumnInspector;
 import org.apache.druid.segment.RowBasedColumnSelectorFactory;
-import org.apache.druid.segment.column.ColumnCapabilities;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
 
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 
 public class KeyTestUtils
 {
@@ -56,10 +54,10 @@ public class KeyTestUtils
     final RowSignature.Builder builder = RowSignature.builder();
 
     for (final KeyColumn keyColumn : keyColumns) {
-      final ColumnCapabilities capabilities = 
inspector.getColumnCapabilities(keyColumn.columnName());
-      final ColumnType columnType =
-          
Optional.ofNullable(capabilities).map(ColumnCapabilities::toColumnType).orElse(null);
-      builder.add(keyColumn.columnName(), columnType);
+      builder.add(
+          keyColumn.columnName(),
+          
ColumnType.fromCapabilities(inspector.getColumnCapabilities(keyColumn.columnName()))
+      );
     }
 
     return builder.build();
diff --git 
a/processing/src/test/java/org/apache/druid/query/lookup/LookupSegmentTest.java 
b/processing/src/test/java/org/apache/druid/query/lookup/LookupSegmentTest.java
index 5826301bcd7..142b23b8391 100644
--- 
a/processing/src/test/java/org/apache/druid/query/lookup/LookupSegmentTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/lookup/LookupSegmentTest.java
@@ -32,6 +32,8 @@ import org.apache.druid.segment.CursorHolder;
 import org.apache.druid.segment.DimensionDictionarySelector;
 import org.apache.druid.segment.RowBasedStorageAdapter;
 import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.timeline.SegmentId;
 import org.hamcrest.CoreMatchers;
@@ -119,6 +121,18 @@ public class LookupSegmentTest
     Assert.assertNull(LOOKUP_SEGMENT.asQueryableIndex());
   }
 
+  @Test
+  public void test_asStorageAdapter_getRowSignature()
+  {
+    Assert.assertEquals(
+        RowSignature.builder()
+                    .add("k", ColumnType.STRING)
+                    .add("v", ColumnType.STRING)
+                    .build(),
+        LOOKUP_SEGMENT.asStorageAdapter().getRowSignature()
+    );
+  }
+
   @Test
   public void test_asStorageAdapter_getAvailableDimensions()
   {
diff --git 
a/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java 
b/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
index 1f46534266f..b8c9721b5a0 100644
--- a/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
+++ b/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
@@ -255,12 +255,12 @@ public class IndexMergerTestBase extends 
InitializedNullHandlingTest
     Assert.assertEquals(6, index.getNumRows());
     Assert.assertEquals(
         ImmutableList.of(
-            ImmutableList.of(timestamp, "1", "2", 1L),
-            ImmutableList.of(timestamp, "1", "2", 1L),
-            ImmutableList.of(timestamp + 1, "1", "2", 1L),
-            ImmutableList.of(timestamp, "3", "4", 1L),
-            ImmutableList.of(timestamp, "3", "4", 1L),
-            ImmutableList.of(timestamp + 1, "3", "4", 1L)
+            ImmutableList.of("1", "2", timestamp, 1L),
+            ImmutableList.of("1", "2", timestamp, 1L),
+            ImmutableList.of("1", "2", timestamp + 1, 1L),
+            ImmutableList.of("3", "4", timestamp, 1L),
+            ImmutableList.of("3", "4", timestamp, 1L),
+            ImmutableList.of("3", "4", timestamp + 1, 1L)
         ),
         FrameTestUtil.readRowsFromAdapter(new 
QueryableIndexStorageAdapter(index), null, false).toList()
     );
@@ -325,10 +325,10 @@ public class IndexMergerTestBase extends 
InitializedNullHandlingTest
     Assert.assertEquals(4, index.getNumRows());
     Assert.assertEquals(
         ImmutableList.of(
-            ImmutableList.of(timestamp, "1", "2", 2L),
-            ImmutableList.of(timestamp + 1, "1", "2", 1L),
-            ImmutableList.of(timestamp, "3", "4", 2L),
-            ImmutableList.of(timestamp + 1, "3", "4", 1L)
+            ImmutableList.of("1", "2", timestamp, 2L),
+            ImmutableList.of("1", "2", timestamp + 1, 1L),
+            ImmutableList.of("3", "4", timestamp, 2L),
+            ImmutableList.of("3", "4", timestamp + 1, 1L)
         ),
         FrameTestUtil.readRowsFromAdapter(new 
QueryableIndexStorageAdapter(index), null, false).toList()
     );
diff --git 
a/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapterTest.java
 
b/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapterTest.java
index b26da1ceba7..7ae04d9fcb1 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapterTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapterTest.java
@@ -58,6 +58,31 @@ public class HashJoinSegmentStorageAdapterTest extends 
BaseHashJoinSegmentStorag
     );
   }
 
+  @Test
+  public void test_getRowSignature_factToCountry()
+  {
+    Assert.assertEquals(
+        ImmutableList.of(
+            "__time",
+            "channel",
+            "regionIsoCode",
+            "countryNumber",
+            "countryIsoCode",
+            "user",
+            "isRobot",
+            "isAnonymous",
+            "namespace",
+            "page",
+            "delta",
+            "channel_uniques",
+            "c1.countryNumber",
+            "c1.countryIsoCode",
+            "c1.countryName"
+        ),
+        
Lists.newArrayList(makeFactToCountrySegment().getRowSignature().getColumnNames())
+    );
+  }
+
   @Test
   public void test_getAvailableDimensions_factToCountry()
   {
diff --git 
a/processing/src/test/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTableTest.java
 
b/processing/src/test/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTableTest.java
index 73daadc7dd1..9b8f7f4d916 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTableTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTableTest.java
@@ -43,6 +43,7 @@ import org.apache.druid.segment.IndexMerger;
 import org.apache.druid.segment.IndexMergerV9;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.QueryableIndexSegment;
+import org.apache.druid.segment.QueryableIndexStorageAdapter;
 import org.apache.druid.segment.SegmentLazyLoadFailCallback;
 import org.apache.druid.segment.SimpleAscendingOffset;
 import org.apache.druid.segment.TestIndex;
@@ -141,9 +142,8 @@ public class BroadcastSegmentIndexedTableTest extends 
InitializedNullHandlingTes
         segment.getTotalSpace()
     );
     backingSegment = (QueryableIndexSegment) factory.factorize(dataSegment, 
segment, false, SegmentLazyLoadFailCallback.NOOP);
-
-    columnNames = 
ImmutableList.<String>builder().add(ColumnHolder.TIME_COLUMN_NAME)
-                                                 
.addAll(backingSegment.asQueryableIndex().getColumnNames()).build();
+    columnNames =
+        new 
QueryableIndexStorageAdapter(backingSegment.asQueryableIndex()).getRowSignature().getColumnNames();
     broadcastTable = new BroadcastSegmentIndexedTable(backingSegment, 
keyColumns, dataSegment.getVersion());
   }
 
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/sink/Sink.java 
b/server/src/main/java/org/apache/druid/segment/realtime/sink/Sink.java
index 8d4fc0dd7f8..5e59c0e9b15 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/sink/Sink.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/sink/Sink.java
@@ -24,6 +24,7 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
@@ -37,7 +38,6 @@ import org.apache.druid.segment.ReferenceCountingSegment;
 import org.apache.druid.segment.SegmentReference;
 import org.apache.druid.segment.StorageAdapter;
 import org.apache.druid.segment.column.ColumnFormat;
-import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.incremental.AppendableIndexSpec;
@@ -92,6 +92,7 @@ public class Sink implements Iterable<FireHydrant>, 
Overshadowable<Sink>
   private final LinkedHashSet<String> dimOrder = new LinkedHashSet<>();
 
   // columns excluding current index (the in-memory fire hydrant), includes 
__time column
+  @GuardedBy("hydrantLock")
   private final LinkedHashSet<String> columnsExcludingCurrIndex = new 
LinkedHashSet<>();
 
   // column types for columns in {@code columnsExcludingCurrIndex}
@@ -397,6 +398,7 @@ public class Sink implements Iterable<FireHydrant>, 
Overshadowable<Sink>
   /**
    * Merge the column from the index with the existing columns.
    */
+  @GuardedBy("hydrantLock")
   private void overwriteIndexDimensions(StorageAdapter storageAdapter)
   {
     RowSignature rowSignature = storageAdapter.getRowSignature();
@@ -414,20 +416,19 @@ public class Sink implements Iterable<FireHydrant>, 
Overshadowable<Sink>
     synchronized (hydrantLock) {
       RowSignature.Builder builder = RowSignature.builder();
 
-      builder.addTimeColumn();
-
+      // Add columns from columnsExcludingCurrIndex.
       for (String dim : columnsExcludingCurrIndex) {
-        if (!ColumnHolder.TIME_COLUMN_NAME.equals(dim)) {
-          builder.add(dim, columnTypeExcludingCurrIndex.get(dim));
-        }
+        builder.add(dim, columnTypeExcludingCurrIndex.get(dim));
       }
 
-      IncrementalIndexStorageAdapter incrementalIndexStorageAdapter = new 
IncrementalIndexStorageAdapter(currHydrant.getIndex());
-      RowSignature incrementalIndexSignature = 
incrementalIndexStorageAdapter.getRowSignature();
+      // Add columns from the currHydrant that do not yet exist in 
columnsExcludingCurrIndex.
+      IncrementalIndexStorageAdapter currStorageAdapter =
+          new IncrementalIndexStorageAdapter(currHydrant.getIndex());
+      RowSignature currSignature = currStorageAdapter.getRowSignature();
 
-      for (String dim : incrementalIndexSignature.getColumnNames()) {
-        if (!columnsExcludingCurrIndex.contains(dim) && 
!ColumnHolder.TIME_COLUMN_NAME.equals(dim)) {
-          builder.add(dim, 
incrementalIndexSignature.getColumnType(dim).orElse(null));
+      for (String dim : currSignature.getColumnNames()) {
+        if (!columnsExcludingCurrIndex.contains(dim)) {
+          builder.add(dim, currSignature.getColumnType(dim).orElse(null));
         }
       }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to